1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Move exec store to its own package inside the daemon.

Remove double reference between containers and exec configurations by
keeping only the container id.

Signed-off-by: David Calavera <david.calavera@gmail.com>
This commit is contained in:
David Calavera 2015-11-20 17:35:16 -05:00
parent b5c507750f
commit 9ca2e4e81c
7 changed files with 174 additions and 135 deletions

View file

@ -14,6 +14,7 @@ import (
"github.com/opencontainers/runc/libcontainer/label" "github.com/opencontainers/runc/libcontainer/label"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver"
"github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/logger/jsonfilelog"
@ -62,7 +63,7 @@ type CommonContainer struct {
hostConfig *runconfig.HostConfig hostConfig *runconfig.HostConfig
command *execdriver.Command command *execdriver.Command
monitor *containerMonitor monitor *containerMonitor
execCommands *execStore execCommands *exec.Store
// logDriver for closing // logDriver for closing
logDriver logger.Logger logDriver logger.Logger
logCopier *logger.Copier logCopier *logger.Copier
@ -75,7 +76,7 @@ func newBaseContainer(id, root string) *Container {
CommonContainer: CommonContainer{ CommonContainer: CommonContainer{
ID: id, ID: id,
State: NewState(), State: NewState(),
execCommands: newExecStore(), execCommands: exec.NewStore(),
root: root, root: root,
MountPoints: make(map[string]*volume.MountPoint), MountPoints: make(map[string]*volume.MountPoint),
StreamConfig: runconfig.NewStreamConfig(), StreamConfig: runconfig.NewStreamConfig(),

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/cliconfig" "github.com/docker/docker/cliconfig"
"github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/events"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver"
"github.com/docker/docker/daemon/execdriver/execdrivers" "github.com/docker/docker/daemon/execdriver/execdrivers"
"github.com/docker/docker/daemon/graphdriver" "github.com/docker/docker/daemon/graphdriver"
@ -106,7 +107,7 @@ type Daemon struct {
repository string repository string
sysInitPath string sysInitPath string
containers *contStore containers *contStore
execCommands *execStore execCommands *exec.Store
graph *graph.Graph graph *graph.Graph
repositories *graph.TagStore repositories *graph.TagStore
idIndex *truncindex.TruncIndex idIndex *truncindex.TruncIndex
@ -790,7 +791,7 @@ func NewDaemon(config *Config, registryService *registry.Service) (daemon *Daemo
d.ID = trustKey.PublicKey().KeyID() d.ID = trustKey.PublicKey().KeyID()
d.repository = daemonRepo d.repository = daemonRepo
d.containers = &contStore{s: make(map[string]*Container)} d.containers = &contStore{s: make(map[string]*Container)}
d.execCommands = newExecStore() d.execCommands = exec.NewStore()
d.graph = g d.graph = g
d.repositories = repositories d.repositories = repositories
d.idIndex = truncindex.NewTruncIndex([]string{}) d.idIndex = truncindex.NewTruncIndex([]string{})

View file

@ -3,91 +3,23 @@ package daemon
import ( import (
"io" "io"
"strings" "strings"
"sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver"
derr "github.com/docker/docker/errors" derr "github.com/docker/docker/errors"
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/stringutils" "github.com/docker/docker/pkg/stringutils"
"github.com/docker/docker/runconfig" "github.com/docker/docker/runconfig"
) )
// ExecConfig holds the configurations for execs. The Daemon keeps func (d *Daemon) registerExecCommand(container *Container, config *exec.Config) {
// track of both running and finished execs so that they can be
// examined both during and after completion.
type ExecConfig struct {
sync.Mutex
ID string
Running bool
ExitCode int
ProcessConfig *execdriver.ProcessConfig
OpenStdin bool
OpenStderr bool
OpenStdout bool
streamConfig *runconfig.StreamConfig
Container *Container
canRemove bool
// waitStart will be closed immediately after the exec is really started.
waitStart chan struct{}
}
type execStore struct {
s map[string]*ExecConfig
sync.RWMutex
}
func newExecStore() *execStore {
return &execStore{s: make(map[string]*ExecConfig, 0)}
}
func (e *execStore) Add(id string, ExecConfig *ExecConfig) {
e.Lock()
e.s[id] = ExecConfig
e.Unlock()
}
func (e *execStore) Get(id string) *ExecConfig {
e.RLock()
res := e.s[id]
e.RUnlock()
return res
}
func (e *execStore) Delete(id string) {
e.Lock()
delete(e.s, id)
e.Unlock()
}
func (e *execStore) List() []string {
var IDs []string
e.RLock()
for id := range e.s {
IDs = append(IDs, id)
}
e.RUnlock()
return IDs
}
func (ExecConfig *ExecConfig) resize(h, w int) error {
select {
case <-ExecConfig.waitStart:
case <-time.After(time.Second):
return derr.ErrorCodeExecResize.WithArgs(ExecConfig.ID)
}
return ExecConfig.ProcessConfig.Terminal.Resize(h, w)
}
func (d *Daemon) registerExecCommand(ExecConfig *ExecConfig) {
// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed. // Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
ExecConfig.Container.execCommands.Add(ExecConfig.ID, ExecConfig) container.execCommands.Add(config.ID, config)
// Storing execs in daemon for easy access via remote API. // Storing execs in daemon for easy access via remote API.
d.execCommands.Add(ExecConfig.ID, ExecConfig) d.execCommands.Add(config.ID, config)
} }
// ExecExists looks up the exec instance and returns a bool if it exists or not. // ExecExists looks up the exec instance and returns a bool if it exists or not.
@ -101,7 +33,7 @@ func (d *Daemon) ExecExists(name string) (bool, error) {
// getExecConfig looks up the exec instance by name. If the container associated // getExecConfig looks up the exec instance by name. If the container associated
// with the exec instance is stopped or paused, it will return an error. // with the exec instance is stopped or paused, it will return an error.
func (d *Daemon) getExecConfig(name string) (*ExecConfig, error) { func (d *Daemon) getExecConfig(name string) (*exec.Config, error) {
ec := d.execCommands.Get(name) ec := d.execCommands.Get(name)
// If the exec is found but its container is not in the daemon's list of // If the exec is found but its container is not in the daemon's list of
@ -110,22 +42,24 @@ func (d *Daemon) getExecConfig(name string) (*ExecConfig, error) {
// the user sees the same error now that they will after the // the user sees the same error now that they will after the
// 5 minute clean-up loop is run which erases old/dead execs. // 5 minute clean-up loop is run which erases old/dead execs.
if ec != nil && d.containers.Get(ec.Container.ID) != nil { if ec != nil {
if !ec.Container.IsRunning() { if container := d.containers.Get(ec.ContainerID); container != nil {
return nil, derr.ErrorCodeContainerNotRunning.WithArgs(ec.Container.ID, ec.Container.State.String()) if !container.IsRunning() {
return nil, derr.ErrorCodeContainerNotRunning.WithArgs(container.ID, container.State.String())
}
if container.isPaused() {
return nil, derr.ErrorCodeExecPaused.WithArgs(container.ID)
}
return ec, nil
} }
if ec.Container.isPaused() {
return nil, derr.ErrorCodeExecPaused.WithArgs(ec.Container.ID)
}
return ec, nil
} }
return nil, derr.ErrorCodeNoExecID.WithArgs(name) return nil, derr.ErrorCodeNoExecID.WithArgs(name)
} }
func (d *Daemon) unregisterExecCommand(ExecConfig *ExecConfig) { func (d *Daemon) unregisterExecCommand(container *Container, execConfig *exec.Config) {
ExecConfig.Container.execCommands.Delete(ExecConfig.ID) container.execCommands.Delete(execConfig.ID)
d.execCommands.Delete(ExecConfig.ID) d.execCommands.Delete(execConfig.ID)
} }
func (d *Daemon) getActiveContainer(name string) (*Container, error) { func (d *Daemon) getActiveContainer(name string) (*Container, error) {
@ -162,23 +96,18 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro
} }
setPlatformSpecificExecProcessConfig(config, container, processConfig) setPlatformSpecificExecProcessConfig(config, container, processConfig)
ExecConfig := &ExecConfig{ execConfig := exec.NewConfig()
ID: stringid.GenerateNonCryptoID(), execConfig.OpenStdin = config.AttachStdin
OpenStdin: config.AttachStdin, execConfig.OpenStdout = config.AttachStdout
OpenStdout: config.AttachStdout, execConfig.OpenStderr = config.AttachStderr
OpenStderr: config.AttachStderr, execConfig.ProcessConfig = processConfig
streamConfig: runconfig.NewStreamConfig(), execConfig.ContainerID = container.ID
ProcessConfig: processConfig,
Container: container,
Running: false,
waitStart: make(chan struct{}),
}
d.registerExecCommand(ExecConfig) d.registerExecCommand(container, execConfig)
d.LogContainerEvent(container, "exec_create: "+ExecConfig.ProcessConfig.Entrypoint+" "+strings.Join(ExecConfig.ProcessConfig.Arguments, " ")) d.LogContainerEvent(container, "exec_create: "+execConfig.ProcessConfig.Entrypoint+" "+strings.Join(execConfig.ProcessConfig.Arguments, " "))
return ExecConfig.ID, nil return execConfig.ID, nil
} }
// ContainerExecStart starts a previously set up exec instance. The // ContainerExecStart starts a previously set up exec instance. The
@ -202,8 +131,8 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
ec.Running = true ec.Running = true
ec.Unlock() ec.Unlock()
logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID) container := d.containers.Get(ec.ContainerID)
container := ec.Container logrus.Debugf("starting exec command %s in container %s", ec.ID, container.ID)
d.LogContainerEvent(container, "exec_start: "+ec.ProcessConfig.Entrypoint+" "+strings.Join(ec.ProcessConfig.Arguments, " ")) d.LogContainerEvent(container, "exec_start: "+ec.ProcessConfig.Entrypoint+" "+strings.Join(ec.ProcessConfig.Arguments, " "))
if ec.OpenStdin { if ec.OpenStdin {
@ -223,12 +152,12 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
} }
if ec.OpenStdin { if ec.OpenStdin {
ec.streamConfig.NewInputPipes() ec.NewInputPipes()
} else { } else {
ec.streamConfig.NewNopInputPipe() ec.NewNopInputPipe()
} }
attachErr := attach(ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr) attachErr := attach(ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
execErr := make(chan error) execErr := make(chan error)
@ -263,19 +192,19 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
} }
// Exec calls the underlying exec driver to run // Exec calls the underlying exec driver to run
func (d *Daemon) Exec(c *Container, ExecConfig *ExecConfig, pipes *execdriver.Pipes, startCallback execdriver.DriverCallback) (int, error) { func (d *Daemon) Exec(c *Container, execConfig *exec.Config, pipes *execdriver.Pipes, startCallback execdriver.DriverCallback) (int, error) {
hooks := execdriver.Hooks{ hooks := execdriver.Hooks{
Start: startCallback, Start: startCallback,
} }
exitStatus, err := d.execDriver.Exec(c.command, ExecConfig.ProcessConfig, pipes, hooks) exitStatus, err := d.execDriver.Exec(c.command, execConfig.ProcessConfig, pipes, hooks)
// On err, make sure we don't leave ExitCode at zero // On err, make sure we don't leave ExitCode at zero
if err != nil && exitStatus == 0 { if err != nil && exitStatus == 0 {
exitStatus = 128 exitStatus = 128
} }
ExecConfig.ExitCode = exitStatus execConfig.ExitCode = exitStatus
ExecConfig.Running = false execConfig.Running = false
return exitStatus, err return exitStatus, err
} }
@ -288,13 +217,13 @@ func (d *Daemon) execCommandGC() {
cleaned int cleaned int
liveExecCommands = d.containerExecIds() liveExecCommands = d.containerExecIds()
) )
for id, config := range d.execCommands.s { for id, config := range d.execCommands.Commands() {
if config.canRemove { if config.CanRemove {
cleaned++ cleaned++
d.execCommands.Delete(id) d.execCommands.Delete(id)
} else { } else {
if _, exists := liveExecCommands[id]; !exists { if _, exists := liveExecCommands[id]; !exists {
config.canRemove = true config.CanRemove = true
} }
} }
} }
@ -316,7 +245,7 @@ func (d *Daemon) containerExecIds() map[string]struct{} {
return ids return ids
} }
func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error { func (d *Daemon) containerExec(container *Container, ec *exec.Config) error {
container.Lock() container.Lock()
defer container.Unlock() defer container.Unlock()
@ -329,43 +258,35 @@ func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error {
c.Close() c.Close()
} }
} }
close(ec.waitStart) ec.Close()
return nil return nil
} }
// We use a callback here instead of a goroutine and an chan for // We use a callback here instead of a goroutine and an chan for
// synchronization purposes // synchronization purposes
cErr := promise.Go(func() error { return d.monitorExec(container, ec, callback) }) cErr := promise.Go(func() error { return d.monitorExec(container, ec, callback) })
return ec.Wait(cErr)
// Exec should not return until the process is actually running
select {
case <-ec.waitStart:
case err := <-cErr:
return err
}
return nil
} }
func (d *Daemon) monitorExec(container *Container, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error { func (d *Daemon) monitorExec(container *Container, execConfig *exec.Config, callback execdriver.DriverCallback) error {
pipes := execdriver.NewPipes(ExecConfig.streamConfig.Stdin(), ExecConfig.streamConfig.Stdout(), ExecConfig.streamConfig.Stderr(), ExecConfig.OpenStdin) pipes := execdriver.NewPipes(execConfig.Stdin(), execConfig.Stdout(), execConfig.Stderr(), execConfig.OpenStdin)
exitCode, err := d.Exec(container, ExecConfig, pipes, callback) exitCode, err := d.Exec(container, execConfig, pipes, callback)
if err != nil { if err != nil {
logrus.Errorf("Error running command in existing container %s: %s", container.ID, err) logrus.Errorf("Error running command in existing container %s: %s", container.ID, err)
} }
logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode) logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode)
if err := ExecConfig.streamConfig.CloseStreams(); err != nil { if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("%s: %s", container.ID, err) logrus.Errorf("%s: %s", container.ID, err)
} }
if ExecConfig.ProcessConfig.Terminal != nil { if execConfig.ProcessConfig.Terminal != nil {
if err := ExecConfig.ProcessConfig.Terminal.Close(); err != nil { if err := execConfig.ProcessConfig.Terminal.Close(); err != nil {
logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err) logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err)
} }
} }
// remove the exec command from the container's store only and not the // remove the exec command from the container's store only and not the
// daemon's store so that the exec command can be inspected. // daemon's store so that the exec command can be inspected.
container.execCommands.Delete(ExecConfig.ID) container.execCommands.Delete(execConfig.ID)
return err return err
} }

115
daemon/exec/exec.go Normal file
View file

@ -0,0 +1,115 @@
package exec
import (
"sync"
"time"
"github.com/docker/docker/daemon/execdriver"
derr "github.com/docker/docker/errors"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/runconfig"
)
// Config holds the configurations for execs. The Daemon keeps
// track of both running and finished execs so that they can be
// examined both during and after completion.
type Config struct {
sync.Mutex
*runconfig.StreamConfig
ID string
Running bool
ExitCode int
ProcessConfig *execdriver.ProcessConfig
OpenStdin bool
OpenStderr bool
OpenStdout bool
CanRemove bool
ContainerID string
// waitStart will be closed immediately after the exec is really started.
waitStart chan struct{}
}
// NewConfig initializes the a new exec configuration
func NewConfig() *Config {
return &Config{
ID: stringid.GenerateNonCryptoID(),
StreamConfig: runconfig.NewStreamConfig(),
waitStart: make(chan struct{}),
}
}
// Store keeps track of the exec configurations.
type Store struct {
commands map[string]*Config
sync.RWMutex
}
// NewStore initializes a new exec store.
func NewStore() *Store {
return &Store{commands: make(map[string]*Config, 0)}
}
// Commands returns the exec configurations in the store.
func (e *Store) Commands() map[string]*Config {
return e.commands
}
// Add adds a new exec configuration to the store.
func (e *Store) Add(id string, Config *Config) {
e.Lock()
e.commands[id] = Config
e.Unlock()
}
// Get returns an exec configuration by its id.
func (e *Store) Get(id string) *Config {
e.RLock()
res := e.commands[id]
e.RUnlock()
return res
}
// Delete removes an exec configuration from the store.
func (e *Store) Delete(id string) {
e.Lock()
delete(e.commands, id)
e.Unlock()
}
// List returns the list of exec ids in the store.
func (e *Store) List() []string {
var IDs []string
e.RLock()
for id := range e.commands {
IDs = append(IDs, id)
}
e.RUnlock()
return IDs
}
// Wait waits until the exec process finishes or there is an error in the error channel.
func (c *Config) Wait(cErr chan error) error {
// Exec should not return until the process is actually running
select {
case <-c.waitStart:
case err := <-cErr:
return err
}
return nil
}
// Close closes the wait channel for the progress.
func (c *Config) Close() {
close(c.waitStart)
}
// Resize changes the size of the terminal for the exec process.
func (c *Config) Resize(h, w int) error {
select {
case <-c.waitStart:
case <-time.After(time.Second):
return derr.ErrorCodeExecResize.WithArgs(c.ID)
}
return c.ProcessConfig.Terminal.Resize(h, w)
}

View file

@ -6,6 +6,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/versions/v1p20" "github.com/docker/docker/api/types/versions/v1p20"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/network" "github.com/docker/docker/daemon/network"
) )
@ -159,7 +160,7 @@ func (daemon *Daemon) getInspectData(container *Container, size bool) (*types.Co
// ContainerExecInspect returns low-level information about the exec // ContainerExecInspect returns low-level information about the exec
// command. An error is returned if the exec cannot be found. // command. An error is returned if the exec cannot be found.
func (daemon *Daemon) ContainerExecInspect(id string) (*ExecConfig, error) { func (daemon *Daemon) ContainerExecInspect(id string) (*exec.Config, error) {
eConfig, err := daemon.getExecConfig(id) eConfig, err := daemon.getExecConfig(id)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -29,5 +29,5 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
return err return err
} }
return ExecConfig.resize(height, width) return ExecConfig.Resize(height, width)
} }

View file

@ -160,8 +160,8 @@ func (daemon *Daemon) Cleanup(container *Container) {
daemon.conditionalUnmountOnCleanup(container) daemon.conditionalUnmountOnCleanup(container)
for _, eConfig := range container.execCommands.s { for _, eConfig := range container.execCommands.Commands() {
daemon.unregisterExecCommand(eConfig) daemon.unregisterExecCommand(container, eConfig)
} }
if err := container.unmountVolumes(false); err != nil { if err := container.unmountVolumes(false); err != nil {