1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge Container and State mutexes

Resolved all deadlocks and fixed race between kill and
monitor.resetContainer
Fixes #7600

Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com>
This commit is contained in:
Alexandr Morozov 2014-08-28 15:39:27 +04:00
parent d9f8d3ea9f
commit 517ba44e37
No known key found for this signature in database
GPG key ID: 59BF89FA47378873
4 changed files with 47 additions and 50 deletions

View file

@ -10,7 +10,6 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
@ -43,7 +42,7 @@ var (
)
type Container struct {
sync.Mutex
*State
root string // Path to the "home" of the container, including metadata.
basefs string // Path to the graphdriver mountpoint
@ -55,7 +54,6 @@ type Container struct {
Args []string
Config *runconfig.Config
State *State
Image string
NetworkSettings *NetworkSettings
@ -276,7 +274,7 @@ func (container *Container) Start() (err error) {
container.Lock()
defer container.Unlock()
if container.State.IsRunning() {
if container.State.Running {
return nil
}
@ -526,11 +524,11 @@ func (container *Container) KillSig(sig int) error {
defer container.Unlock()
// We could unpause the container for them rather than returning this error
if container.State.IsPaused() {
if container.State.Paused {
return fmt.Errorf("Container %s is paused. Unpause the container before stopping", container.ID)
}
if !container.State.IsRunning() {
if !container.State.Running {
return nil
}
@ -541,7 +539,7 @@ func (container *Container) KillSig(sig int) error {
// if the container is currently restarting we do not need to send the signal
// to the process. Telling the monitor that it should exit on it's next event
// loop is enough
if container.State.IsRestarting() {
if container.State.Restarting {
return nil
}

View file

@ -70,7 +70,7 @@ func (daemon *Daemon) Containers(job *engine.Job) engine.Status {
writeCont := func(container *Container) error {
container.Lock()
defer container.Unlock()
if !container.State.IsRunning() && !all && n <= 0 && since == "" && before == "" {
if !container.State.Running && !all && n <= 0 && since == "" && before == "" {
return nil
}
if before != "" && !foundBefore {
@ -87,7 +87,7 @@ func (daemon *Daemon) Containers(job *engine.Job) engine.Status {
return errLast
}
}
if len(filt_exited) > 0 && !container.State.IsRunning() {
if len(filt_exited) > 0 && !container.State.Running {
should_skip := true
for _, code := range filt_exited {
if code == container.State.GetExitCode() {

View file

@ -110,7 +110,7 @@ func (m *containerMonitor) Start() error {
defer func() {
if afterRun {
m.container.Lock()
m.container.State.SetStopped(exitStatus)
m.container.State.setStopped(exitStatus)
defer m.container.Unlock()
}
m.Close()
@ -123,7 +123,7 @@ func (m *containerMonitor) Start() error {
m.container.RestartCount++
if err := m.container.startLoggingToDisk(); err != nil {
m.resetContainer()
m.resetContainer(false)
return err
}
@ -138,7 +138,7 @@ func (m *containerMonitor) Start() error {
// if we receive an internal error from the initial start of a container then lets
// return it instead of entering the restart loop
if m.container.RestartCount == 0 {
m.resetContainer()
m.resetContainer(false)
return err
}
@ -154,7 +154,7 @@ func (m *containerMonitor) Start() error {
if m.shouldRestart(exitStatus) {
m.container.State.SetRestarting(exitStatus)
m.container.LogEvent("die")
m.resetContainer()
m.resetContainer(true)
// sleep with a small time increment between each restart to help avoid issues cased by quickly
// restarting the container because of some types of errors ( networking cut out, etc... )
@ -168,7 +168,7 @@ func (m *containerMonitor) Start() error {
continue
}
m.container.LogEvent("die")
m.resetContainer()
m.resetContainer(true)
return err
}
}
@ -243,7 +243,7 @@ func (m *containerMonitor) callback(command *execdriver.Command) {
}
}
m.container.State.SetRunning(command.Pid())
m.container.State.setRunning(command.Pid())
// signal that the process has started
// close channel only if not closed
@ -260,8 +260,13 @@ func (m *containerMonitor) callback(command *execdriver.Command) {
// resetContainer resets the container's IO and ensures that the command is able to be executed again
// by copying the data into a new struct
func (m *containerMonitor) resetContainer() {
// if lock is true, then container locked during reset
func (m *containerMonitor) resetContainer(lock bool) {
container := m.container
if lock {
container.Lock()
defer container.Unlock()
}
if container.Config.OpenStdin {
if err := container.stdin.Close(); err != nil {

View file

@ -1,7 +1,6 @@
package daemon
import (
"encoding/json"
"fmt"
"sync"
"time"
@ -10,7 +9,7 @@ import (
)
type State struct {
sync.RWMutex
sync.Mutex
Running bool
Paused bool
Restarting bool
@ -29,9 +28,6 @@ func NewState() *State {
// String returns a human-readable description of the state
func (s *State) String() string {
s.RLock()
defer s.RUnlock()
if s.Running {
if s.Paused {
return fmt.Sprintf("Up %s (Paused)", units.HumanDuration(time.Now().UTC().Sub(s.StartedAt)))
@ -50,16 +46,6 @@ func (s *State) String() string {
return fmt.Sprintf("Exited (%d) %s ago", s.ExitCode, units.HumanDuration(time.Now().UTC().Sub(s.FinishedAt)))
}
type jState State
// MarshalJSON for state is needed to avoid race conditions on inspect
func (s *State) MarshalJSON() ([]byte, error) {
s.RLock()
b, err := json.Marshal(jState(*s))
s.RUnlock()
return b, err
}
func wait(waitChan <-chan struct{}, timeout time.Duration) error {
if timeout < 0 {
<-waitChan
@ -77,14 +63,14 @@ func wait(waitChan <-chan struct{}, timeout time.Duration) error {
// immediatly. If you want wait forever you must supply negative timeout.
// Returns pid, that was passed to SetRunning
func (s *State) WaitRunning(timeout time.Duration) (int, error) {
s.RLock()
if s.IsRunning() {
s.Lock()
if s.Running {
pid := s.Pid
s.RUnlock()
s.Unlock()
return pid, nil
}
waitChan := s.waitChan
s.RUnlock()
s.Unlock()
if err := wait(waitChan, timeout); err != nil {
return -1, err
}
@ -95,14 +81,14 @@ func (s *State) WaitRunning(timeout time.Duration) (int, error) {
// immediatly. If you want wait forever you must supply negative timeout.
// Returns exit code, that was passed to SetStopped
func (s *State) WaitStop(timeout time.Duration) (int, error) {
s.RLock()
s.Lock()
if !s.Running {
exitCode := s.ExitCode
s.RUnlock()
s.Unlock()
return exitCode, nil
}
waitChan := s.waitChan
s.RUnlock()
s.Unlock()
if err := wait(waitChan, timeout); err != nil {
return -1, err
}
@ -110,28 +96,33 @@ func (s *State) WaitStop(timeout time.Duration) (int, error) {
}
func (s *State) IsRunning() bool {
s.RLock()
s.Lock()
res := s.Running
s.RUnlock()
s.Unlock()
return res
}
func (s *State) GetPid() int {
s.RLock()
s.Lock()
res := s.Pid
s.RUnlock()
s.Unlock()
return res
}
func (s *State) GetExitCode() int {
s.RLock()
s.Lock()
res := s.ExitCode
s.RUnlock()
s.Unlock()
return res
}
func (s *State) SetRunning(pid int) {
s.Lock()
s.setRunning(pid)
s.Unlock()
}
func (s *State) setRunning(pid int) {
s.Running = true
s.Paused = false
s.Restarting = false
@ -140,11 +131,15 @@ func (s *State) SetRunning(pid int) {
s.StartedAt = time.Now().UTC()
close(s.waitChan) // fire waiters for start
s.waitChan = make(chan struct{})
s.Unlock()
}
func (s *State) SetStopped(exitCode int) {
s.Lock()
s.setStopped(exitCode)
s.Unlock()
}
func (s *State) setStopped(exitCode int) {
s.Running = false
s.Restarting = false
s.Pid = 0
@ -152,7 +147,6 @@ func (s *State) SetStopped(exitCode int) {
s.ExitCode = exitCode
close(s.waitChan) // fire waiters for stop
s.waitChan = make(chan struct{})
s.Unlock()
}
// SetRestarting is when docker hanldes the auto restart of containers when they are
@ -172,9 +166,9 @@ func (s *State) SetRestarting(exitCode int) {
}
func (s *State) IsRestarting() bool {
s.RLock()
s.Lock()
res := s.Restarting
s.RUnlock()
s.Unlock()
return res
}
@ -191,8 +185,8 @@ func (s *State) SetUnpaused() {
}
func (s *State) IsPaused() bool {
s.RLock()
s.Lock()
res := s.Paused
s.RUnlock()
s.Unlock()
return res
}