Adding ability to change max download attempts
Moby works perfectly when you are in a situation when one has a good and stable internet connection. Operating in area's where internet connectivity is likely to be lost in undetermined intervals, like a satellite connection or 4G/LTE in rural area's, can become a problem when pulling a new image. When connection is lost while image layers are being pulled, Moby will try to reconnect up to 5 times. If this fails, the incompletely downloaded layers are lost will need to be completely downloaded again during the next pull request. This means that we are using more data than we might have to. Pulling a layer multiple times from the start can become costly over a satellite or 4G/LTE connection. As these techniques (especially 4G) quite common in IoT and Moby is used to run Azure IoT Edge devices, I would like to add a settable maximum download attempts. The maximum download attempts is currently set at 5 (distribution/xfer/download.go). I would like to change this constant to a variable that the user can set. The default will still be 5, so nothing will change from the current version unless specified when starting the daemon with the added flag or in the config file. I added a default value of 5 for DefaultMaxDownloadAttempts and a settable max-download-attempts in the daemon config file. It is also added to the config of dockerd so it can be set with a flag when starting the daemon. This value gets stored in the imageService of the daemon when it is initiated and can be passed to the NewLayerDownloadManager as a parameter. It will be stored in the LayerDownloadManager when initiated. This enables us to set the max amount of retries in makeDownoadFunc equal to the max download attempts. I also added some tests that are based on maxConcurrentDownloads/maxConcurrentUploads. You can pull this version and test in a development container. Either create a config `file /etc/docker/daemon.json` with `{"max-download-attempts"=3}``, or use `dockerd --max-download-attempts=3 -D &` to start up the dockerd. Start downloading a container and disconnect from the internet whilst downloading. The result would be that it stops pulling after three attempts. Signed-off-by: Lukas Heeren <lukas-heeren@hotmail.com> Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
parent
3acf0cc795
commit
ce61a1ed98
|
@ -20,7 +20,7 @@ const (
|
||||||
|
|
||||||
// installCommonConfigFlags adds flags to the pflag.FlagSet to configure the daemon
|
// installCommonConfigFlags adds flags to the pflag.FlagSet to configure the daemon
|
||||||
func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
|
func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
|
||||||
var maxConcurrentDownloads, maxConcurrentUploads int
|
var maxConcurrentDownloads, maxConcurrentUploads, maxDownloadAttempts int
|
||||||
defaultPidFile, err := getDefaultPidFile()
|
defaultPidFile, err := getDefaultPidFile()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -73,6 +73,7 @@ func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
|
||||||
flags.StringVar(&conf.CorsHeaders, "api-cors-header", "", "Set CORS headers in the Engine API")
|
flags.StringVar(&conf.CorsHeaders, "api-cors-header", "", "Set CORS headers in the Engine API")
|
||||||
flags.IntVar(&maxConcurrentDownloads, "max-concurrent-downloads", config.DefaultMaxConcurrentDownloads, "Set the max concurrent downloads for each pull")
|
flags.IntVar(&maxConcurrentDownloads, "max-concurrent-downloads", config.DefaultMaxConcurrentDownloads, "Set the max concurrent downloads for each pull")
|
||||||
flags.IntVar(&maxConcurrentUploads, "max-concurrent-uploads", config.DefaultMaxConcurrentUploads, "Set the max concurrent uploads for each push")
|
flags.IntVar(&maxConcurrentUploads, "max-concurrent-uploads", config.DefaultMaxConcurrentUploads, "Set the max concurrent uploads for each push")
|
||||||
|
flags.IntVar(&maxDownloadAttempts, "max-download-attempts", config.DefaultDownloadAttempts, "Set the max download attempts for each pull")
|
||||||
flags.IntVar(&conf.ShutdownTimeout, "shutdown-timeout", defaultShutdownTimeout, "Set the default shutdown timeout")
|
flags.IntVar(&conf.ShutdownTimeout, "shutdown-timeout", defaultShutdownTimeout, "Set the default shutdown timeout")
|
||||||
flags.IntVar(&conf.NetworkDiagnosticPort, "network-diagnostic-port", 0, "TCP port number of the network diagnostic server")
|
flags.IntVar(&conf.NetworkDiagnosticPort, "network-diagnostic-port", 0, "TCP port number of the network diagnostic server")
|
||||||
_ = flags.MarkHidden("network-diagnostic-port")
|
_ = flags.MarkHidden("network-diagnostic-port")
|
||||||
|
@ -87,6 +88,7 @@ func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
|
||||||
|
|
||||||
conf.MaxConcurrentDownloads = &maxConcurrentDownloads
|
conf.MaxConcurrentDownloads = &maxConcurrentDownloads
|
||||||
conf.MaxConcurrentUploads = &maxConcurrentUploads
|
conf.MaxConcurrentUploads = &maxConcurrentUploads
|
||||||
|
conf.MaxDownloadAttempts = &maxDownloadAttempts
|
||||||
|
|
||||||
flags.StringVar(&conf.ContainerdNamespace, "containerd-namespace", daemon.ContainersNamespace, "Containerd namespace to use")
|
flags.StringVar(&conf.ContainerdNamespace, "containerd-namespace", daemon.ContainersNamespace, "Containerd namespace to use")
|
||||||
if err := flags.MarkHidden("containerd-namespace"); err != nil {
|
if err := flags.MarkHidden("containerd-namespace"); err != nil {
|
||||||
|
|
|
@ -31,6 +31,10 @@ const (
|
||||||
// maximum number of uploads that
|
// maximum number of uploads that
|
||||||
// may take place at a time for each push.
|
// may take place at a time for each push.
|
||||||
DefaultMaxConcurrentUploads = 5
|
DefaultMaxConcurrentUploads = 5
|
||||||
|
// DefaultDownloadAttempts is the default value for
|
||||||
|
// maximum number of attempts that
|
||||||
|
// may take place at a time for each pull when the connection is lost.
|
||||||
|
DefaultDownloadAttempts = 5
|
||||||
// StockRuntimeName is the reserved name/alias used to represent the
|
// StockRuntimeName is the reserved name/alias used to represent the
|
||||||
// OCI runtime being shipped with the docker daemon package.
|
// OCI runtime being shipped with the docker daemon package.
|
||||||
StockRuntimeName = "runc"
|
StockRuntimeName = "runc"
|
||||||
|
@ -172,6 +176,10 @@ type CommonConfig struct {
|
||||||
// may take place at a time for each push.
|
// may take place at a time for each push.
|
||||||
MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"`
|
MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"`
|
||||||
|
|
||||||
|
// MaxDownloadAttempts is the maximum number of attempts that
|
||||||
|
// may take place at a time for each push.
|
||||||
|
MaxDownloadAttempts *int `json:"max-download-attempts,omitempty"`
|
||||||
|
|
||||||
// ShutdownTimeout is the timeout value (in seconds) the daemon will wait for the container
|
// ShutdownTimeout is the timeout value (in seconds) the daemon will wait for the container
|
||||||
// to stop when daemon is being shutdown
|
// to stop when daemon is being shutdown
|
||||||
ShutdownTimeout int `json:"shutdown-timeout,omitempty"`
|
ShutdownTimeout int `json:"shutdown-timeout,omitempty"`
|
||||||
|
@ -534,7 +542,7 @@ func findConfigurationConflicts(config map[string]interface{}, flags *pflag.Flag
|
||||||
|
|
||||||
// Validate validates some specific configs.
|
// Validate validates some specific configs.
|
||||||
// such as config.DNS, config.Labels, config.DNSSearch,
|
// such as config.DNS, config.Labels, config.DNSSearch,
|
||||||
// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads.
|
// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads and config.MaxDownloadAttempts.
|
||||||
func Validate(config *Config) error {
|
func Validate(config *Config) error {
|
||||||
// validate DNS
|
// validate DNS
|
||||||
for _, dns := range config.DNS {
|
for _, dns := range config.DNS {
|
||||||
|
@ -564,6 +572,9 @@ func Validate(config *Config) error {
|
||||||
if config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 {
|
if config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 {
|
||||||
return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads)
|
return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads)
|
||||||
}
|
}
|
||||||
|
if err := ValidateMaxDownloadAttempts(config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// validate that "default" runtime is not reset
|
// validate that "default" runtime is not reset
|
||||||
if runtimes := config.GetAllRuntimes(); len(runtimes) > 0 {
|
if runtimes := config.GetAllRuntimes(); len(runtimes) > 0 {
|
||||||
|
@ -587,6 +598,14 @@ func Validate(config *Config) error {
|
||||||
return config.ValidatePlatformConfig()
|
return config.ValidatePlatformConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateMaxDownloadAttempts validates if the max-download-attempts is within the valid range
|
||||||
|
func ValidateMaxDownloadAttempts(config *Config) error {
|
||||||
|
if config.MaxDownloadAttempts != nil && *config.MaxDownloadAttempts <= 0 {
|
||||||
|
return fmt.Errorf("invalid max download attempts: %d", *config.MaxDownloadAttempts)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ModifiedDiscoverySettings returns whether the discovery configuration has been modified or not.
|
// ModifiedDiscoverySettings returns whether the discovery configuration has been modified or not.
|
||||||
func ModifiedDiscoverySettings(config *Config, backendType, advertise string, clusterOpts map[string]string) bool {
|
func ModifiedDiscoverySettings(config *Config, backendType, advertise string, clusterOpts map[string]string) bool {
|
||||||
if config.ClusterStore != backendType || config.ClusterAdvertise != advertise {
|
if config.ClusterStore != backendType || config.ClusterAdvertise != advertise {
|
||||||
|
|
|
@ -223,25 +223,33 @@ func TestValidateReservedNamespaceLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValidateConfigurationErrors(t *testing.T) {
|
func TestValidateConfigurationErrors(t *testing.T) {
|
||||||
minusNumber := -10
|
intPtr := func(i int) *int { return &i }
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
config *Config
|
name string
|
||||||
|
config *Config
|
||||||
|
expectedErr string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "single label without value",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
Labels: []string{"one"},
|
Labels: []string{"one"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "bad attribute format: one",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "multiple label without value",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
Labels: []string{"foo=bar", "one"},
|
Labels: []string{"foo=bar", "one"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "bad attribute format: one",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "single DNS, invalid IP-address",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -249,8 +257,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "1.1.1.1o is not an ip address",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "multiple DNS, invalid IP-address",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -258,8 +268,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "1.1.1.1o is not an ip address",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "single DNSSearch",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -267,8 +279,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "123456 is not a valid domain",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "multiple DNSSearch",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -276,58 +290,80 @@ func TestValidateConfigurationErrors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "123456 is not a valid domain",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "negative max-concurrent-downloads",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
MaxConcurrentDownloads: &minusNumber,
|
MaxConcurrentDownloads: intPtr(-10),
|
||||||
// This is weird...
|
|
||||||
ValuesSet: map[string]interface{}{
|
|
||||||
"max-concurrent-downloads": -1,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "invalid max concurrent downloads: -10",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "negative max-concurrent-uploads",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
MaxConcurrentUploads: &minusNumber,
|
MaxConcurrentUploads: intPtr(-10),
|
||||||
// This is weird...
|
|
||||||
ValuesSet: map[string]interface{}{
|
|
||||||
"max-concurrent-uploads": -1,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "invalid max concurrent uploads: -10",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "negative max-download-attempts",
|
||||||
|
config: &Config{
|
||||||
|
CommonConfig: CommonConfig{
|
||||||
|
MaxDownloadAttempts: intPtr(-10),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedErr: "invalid max download attempts: -10",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero max-download-attempts",
|
||||||
|
config: &Config{
|
||||||
|
CommonConfig: CommonConfig{
|
||||||
|
MaxDownloadAttempts: intPtr(0),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedErr: "invalid max download attempts: 0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "generic resource without =",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
NodeGenericResources: []string{"foo"},
|
NodeGenericResources: []string{"foo"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "could not parse GenericResource: incorrect term foo, missing '=' or malformed expression",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "generic resource mixed named and discrete",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
NodeGenericResources: []string{"foo=bar", "foo=1"},
|
NodeGenericResources: []string{"foo=bar", "foo=1"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedErr: "could not parse GenericResource: mixed discrete and named resources in expression 'foo=[bar 1]'",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
err := Validate(tc.config)
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
if err == nil {
|
err := Validate(tc.config)
|
||||||
t.Fatalf("expected error, got nil for config %v", tc.config)
|
assert.Error(t, err, tc.expectedErr)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValidateConfiguration(t *testing.T) {
|
func TestValidateConfiguration(t *testing.T) {
|
||||||
minusNumber := 4
|
intPtr := func(i int) *int { return &i }
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
name string
|
||||||
config *Config
|
config *Config
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "with label",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
Labels: []string{"one=two"},
|
Labels: []string{"one=two"},
|
||||||
|
@ -335,6 +371,7 @@ func TestValidateConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with dns",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -344,6 +381,7 @@ func TestValidateConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with dns-search",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
|
@ -353,28 +391,31 @@ func TestValidateConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with max-concurrent-downloads",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
MaxConcurrentDownloads: &minusNumber,
|
MaxConcurrentDownloads: intPtr(4),
|
||||||
// This is weird...
|
|
||||||
ValuesSet: map[string]interface{}{
|
|
||||||
"max-concurrent-downloads": -1,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with max-concurrent-uploads",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
MaxConcurrentUploads: &minusNumber,
|
MaxConcurrentUploads: intPtr(4),
|
||||||
// This is weird...
|
|
||||||
ValuesSet: map[string]interface{}{
|
|
||||||
"max-concurrent-uploads": -1,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with max-download-attempts",
|
||||||
|
config: &Config{
|
||||||
|
CommonConfig: CommonConfig{
|
||||||
|
MaxDownloadAttempts: intPtr(4),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with multiple node generic resources",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
NodeGenericResources: []string{"foo=bar", "foo=baz"},
|
NodeGenericResources: []string{"foo=bar", "foo=baz"},
|
||||||
|
@ -382,6 +423,7 @@ func TestValidateConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "with node generic resources",
|
||||||
config: &Config{
|
config: &Config{
|
||||||
CommonConfig: CommonConfig{
|
CommonConfig: CommonConfig{
|
||||||
NodeGenericResources: []string{"foo=1"},
|
NodeGenericResources: []string{"foo=1"},
|
||||||
|
@ -390,10 +432,10 @@ func TestValidateConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
err := Validate(tc.config)
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
if err != nil {
|
err := Validate(tc.config)
|
||||||
t.Fatalf("expected no error, got error %v", err)
|
assert.NilError(t, err)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1048,6 +1048,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
|
||||||
LayerStores: layerStores,
|
LayerStores: layerStores,
|
||||||
MaxConcurrentDownloads: *config.MaxConcurrentDownloads,
|
MaxConcurrentDownloads: *config.MaxConcurrentDownloads,
|
||||||
MaxConcurrentUploads: *config.MaxConcurrentUploads,
|
MaxConcurrentUploads: *config.MaxConcurrentUploads,
|
||||||
|
MaxDownloadAttempts: *config.MaxDownloadAttempts,
|
||||||
ReferenceStore: rs,
|
ReferenceStore: rs,
|
||||||
RegistryService: registryService,
|
RegistryService: registryService,
|
||||||
TrustKey: trustKey,
|
TrustKey: trustKey,
|
||||||
|
|
|
@ -38,6 +38,7 @@ type ImageServiceConfig struct {
|
||||||
LayerStores map[string]layer.Store
|
LayerStores map[string]layer.Store
|
||||||
MaxConcurrentDownloads int
|
MaxConcurrentDownloads int
|
||||||
MaxConcurrentUploads int
|
MaxConcurrentUploads int
|
||||||
|
MaxDownloadAttempts int
|
||||||
ReferenceStore dockerreference.Store
|
ReferenceStore dockerreference.Store
|
||||||
RegistryService registry.Service
|
RegistryService registry.Service
|
||||||
TrustKey libtrust.PrivateKey
|
TrustKey libtrust.PrivateKey
|
||||||
|
@ -47,10 +48,11 @@ type ImageServiceConfig struct {
|
||||||
func NewImageService(config ImageServiceConfig) *ImageService {
|
func NewImageService(config ImageServiceConfig) *ImageService {
|
||||||
logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads)
|
logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads)
|
||||||
logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads)
|
logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads)
|
||||||
|
logrus.Debugf("Max Download Attempts: %d", config.MaxDownloadAttempts)
|
||||||
return &ImageService{
|
return &ImageService{
|
||||||
containers: config.ContainerStore,
|
containers: config.ContainerStore,
|
||||||
distributionMetadataStore: config.DistributionMetadataStore,
|
distributionMetadataStore: config.DistributionMetadataStore,
|
||||||
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads),
|
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
|
||||||
eventsService: config.EventsService,
|
eventsService: config.EventsService,
|
||||||
imageStore: config.ImageStore,
|
imageStore: config.ImageStore,
|
||||||
layerStores: config.LayerStores,
|
layerStores: config.LayerStores,
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
// - Daemon debug log level
|
// - Daemon debug log level
|
||||||
// - Daemon max concurrent downloads
|
// - Daemon max concurrent downloads
|
||||||
// - Daemon max concurrent uploads
|
// - Daemon max concurrent uploads
|
||||||
|
// - Daemon max download attempts
|
||||||
// - Daemon shutdown timeout (in seconds)
|
// - Daemon shutdown timeout (in seconds)
|
||||||
// - Cluster discovery (reconfigure and restart)
|
// - Cluster discovery (reconfigure and restart)
|
||||||
// - Daemon labels
|
// - Daemon labels
|
||||||
|
@ -44,6 +45,9 @@ func (daemon *Daemon) Reload(conf *config.Config) (err error) {
|
||||||
}
|
}
|
||||||
daemon.reloadDebug(conf, attributes)
|
daemon.reloadDebug(conf, attributes)
|
||||||
daemon.reloadMaxConcurrentDownloadsAndUploads(conf, attributes)
|
daemon.reloadMaxConcurrentDownloadsAndUploads(conf, attributes)
|
||||||
|
if err := daemon.reloadMaxDownloadAttempts(conf, attributes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
daemon.reloadShutdownTimeout(conf, attributes)
|
daemon.reloadShutdownTimeout(conf, attributes)
|
||||||
daemon.reloadFeatures(conf, attributes)
|
daemon.reloadFeatures(conf, attributes)
|
||||||
|
|
||||||
|
@ -110,6 +114,27 @@ func (daemon *Daemon) reloadMaxConcurrentDownloadsAndUploads(conf *config.Config
|
||||||
attributes["max-concurrent-uploads"] = fmt.Sprintf("%d", *daemon.configStore.MaxConcurrentUploads)
|
attributes["max-concurrent-uploads"] = fmt.Sprintf("%d", *daemon.configStore.MaxConcurrentUploads)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reloadMaxDownloadAttempts updates configuration with max concurrent
|
||||||
|
// download attempts when a connection is lost and updates the passed attributes
|
||||||
|
func (daemon *Daemon) reloadMaxDownloadAttempts(conf *config.Config, attributes map[string]string) error {
|
||||||
|
if err := config.ValidateMaxDownloadAttempts(conf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no value is set for max-download-attempts we assume it is the default value
|
||||||
|
// We always "reset" as the cost is lightweight and easy to maintain.
|
||||||
|
maxDownloadAttempts := config.DefaultDownloadAttempts
|
||||||
|
if conf.IsValueSet("max-download-attempts") && conf.MaxDownloadAttempts != nil {
|
||||||
|
maxDownloadAttempts = *conf.MaxDownloadAttempts
|
||||||
|
}
|
||||||
|
daemon.configStore.MaxDownloadAttempts = &maxDownloadAttempts
|
||||||
|
logrus.Debugf("Reset Max Download Attempts: %d", *daemon.configStore.MaxDownloadAttempts)
|
||||||
|
|
||||||
|
// prepare reload event attributes with updatable configurations
|
||||||
|
attributes["max-download-attempts"] = fmt.Sprintf("%d", *daemon.configStore.MaxDownloadAttempts)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// reloadShutdownTimeout updates configuration with daemon shutdown timeout option
|
// reloadShutdownTimeout updates configuration with daemon shutdown timeout option
|
||||||
// and updates the passed attributes
|
// and updates the passed attributes
|
||||||
func (daemon *Daemon) reloadShutdownTimeout(conf *config.Config, attributes map[string]string) {
|
func (daemon *Daemon) reloadShutdownTimeout(conf *config.Config, attributes map[string]string) {
|
||||||
|
|
|
@ -24,9 +24,10 @@ const maxDownloadAttempts = 5
|
||||||
// registers and downloads those, taking into account dependencies between
|
// registers and downloads those, taking into account dependencies between
|
||||||
// layers.
|
// layers.
|
||||||
type LayerDownloadManager struct {
|
type LayerDownloadManager struct {
|
||||||
layerStores map[string]layer.Store
|
layerStores map[string]layer.Store
|
||||||
tm TransferManager
|
tm TransferManager
|
||||||
waitDuration time.Duration
|
waitDuration time.Duration
|
||||||
|
maxDownloadAttempts int
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetConcurrency sets the max concurrent downloads for each pull
|
// SetConcurrency sets the max concurrent downloads for each pull
|
||||||
|
@ -37,9 +38,10 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
|
||||||
// NewLayerDownloadManager returns a new LayerDownloadManager.
|
// NewLayerDownloadManager returns a new LayerDownloadManager.
|
||||||
func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
|
func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
|
||||||
manager := LayerDownloadManager{
|
manager := LayerDownloadManager{
|
||||||
layerStores: layerStores,
|
layerStores: layerStores,
|
||||||
tm: NewTransferManager(concurrencyLimit),
|
tm: NewTransferManager(concurrencyLimit),
|
||||||
waitDuration: time.Second,
|
waitDuration: time.Second,
|
||||||
|
maxDownloadAttempts: maxDownloadAttempts,
|
||||||
}
|
}
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
option(&manager)
|
option(&manager)
|
||||||
|
@ -47,6 +49,14 @@ func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimi
|
||||||
return &manager
|
return &manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMaxDownloadAttempts configures the maximum number of download
|
||||||
|
// attempts for a download manager.
|
||||||
|
func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) {
|
||||||
|
return func(dlm *LayerDownloadManager) {
|
||||||
|
dlm.maxDownloadAttempts = max
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type downloadTransfer struct {
|
type downloadTransfer struct {
|
||||||
Transfer
|
Transfer
|
||||||
|
|
||||||
|
@ -283,13 +293,13 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
retries++
|
retries++
|
||||||
if _, isDNR := err.(DoNotRetry); isDNR || retries == maxDownloadAttempts {
|
if _, isDNR := err.(DoNotRetry); isDNR || retries > ldm.maxDownloadAttempts {
|
||||||
logrus.Errorf("Download failed: %v", err)
|
logrus.Errorf("Download failed after %d attempts: %v", retries, err)
|
||||||
d.err = err
|
d.err = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Errorf("Download failed, retrying: %v", err)
|
logrus.Infof("Download failed, retrying (%d/%d): %v", retries, ldm.maxDownloadAttempts, err)
|
||||||
delay := retries * 5
|
delay := retries * 5
|
||||||
ticker := time.NewTicker(ldm.waitDuration)
|
ticker := time.NewTicker(ldm.waitDuration)
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/docker/docker/layer"
|
"github.com/docker/docker/layer"
|
||||||
"github.com/docker/docker/pkg/progress"
|
"github.com/docker/docker/pkg/progress"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
"gotest.tools/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxDownloadConcurrency = 3
|
const maxDownloadConcurrency = 3
|
||||||
|
@ -160,6 +161,7 @@ type mockDownloadDescriptor struct {
|
||||||
registeredDiffID layer.DiffID
|
registeredDiffID layer.DiffID
|
||||||
expectedDiffID layer.DiffID
|
expectedDiffID layer.DiffID
|
||||||
simulateRetries int
|
simulateRetries int
|
||||||
|
retries int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key returns the key used to deduplicate downloads.
|
// Key returns the key used to deduplicate downloads.
|
||||||
|
@ -213,9 +215,9 @@ func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if d.simulateRetries != 0 {
|
if d.retries < d.simulateRetries {
|
||||||
d.simulateRetries--
|
d.retries++
|
||||||
return nil, 0, errors.New("simulating retry")
|
return nil, 0, fmt.Errorf("simulating download attempt %d/%d", d.retries, d.simulateRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.mockTarStream(), 0, nil
|
return d.mockTarStream(), 0, nil
|
||||||
|
@ -359,3 +361,70 @@ func TestCancelledDownload(t *testing.T) {
|
||||||
close(progressChan)
|
close(progressChan)
|
||||||
<-progressDone
|
<-progressDone
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMaxDownloadAttempts(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
simulateRetries int
|
||||||
|
maxDownloadAttempts int
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "max-attempts=5, succeed at 2nd attempt",
|
||||||
|
simulateRetries: 2,
|
||||||
|
maxDownloadAttempts: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "max-attempts=5, succeed at 5th attempt",
|
||||||
|
simulateRetries: 5,
|
||||||
|
maxDownloadAttempts: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "max-attempts=5, fail at 6th attempt",
|
||||||
|
simulateRetries: 6,
|
||||||
|
maxDownloadAttempts: 5,
|
||||||
|
expectedErr: "simulating download attempt 5/6",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "max-attempts=0, fail after 1 attempt",
|
||||||
|
simulateRetries: 1,
|
||||||
|
maxDownloadAttempts: 0,
|
||||||
|
expectedErr: "simulating download attempt 1/1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
||||||
|
lsMap := make(map[string]layer.Store)
|
||||||
|
lsMap[runtime.GOOS] = layerStore
|
||||||
|
ldm := NewLayerDownloadManager(
|
||||||
|
lsMap,
|
||||||
|
maxDownloadConcurrency,
|
||||||
|
func(m *LayerDownloadManager) {
|
||||||
|
m.waitDuration = time.Millisecond
|
||||||
|
m.maxDownloadAttempts = tc.maxDownloadAttempts
|
||||||
|
})
|
||||||
|
|
||||||
|
progressChan := make(chan progress.Progress)
|
||||||
|
progressDone := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for range progressChan {
|
||||||
|
}
|
||||||
|
close(progressDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var currentDownloads int32
|
||||||
|
descriptors := downloadDescriptors(¤tDownloads)
|
||||||
|
descriptors[4].(*mockDownloadDescriptor).simulateRetries = tc.simulateRetries
|
||||||
|
|
||||||
|
_, _, err := ldm.Download(context.Background(), *image.NewRootFS(), runtime.GOOS, descriptors, progress.ChanOutput(progressChan))
|
||||||
|
if tc.expectedErr == "" {
|
||||||
|
assert.NilError(t, err)
|
||||||
|
} else {
|
||||||
|
assert.Error(t, err, tc.expectedErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue