mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			257 lines
		
	
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			257 lines
		
	
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package engine
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"container/ring"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
type Output struct {
 | 
						|
	sync.Mutex
 | 
						|
	dests []io.Writer
 | 
						|
	tasks sync.WaitGroup
 | 
						|
	used  bool
 | 
						|
}
 | 
						|
 | 
						|
// NewOutput returns a new Output object with no destinations attached.
 | 
						|
// Writing to an empty Output will cause the written data to be discarded.
 | 
						|
func NewOutput() *Output {
 | 
						|
	return &Output{}
 | 
						|
}
 | 
						|
 | 
						|
// Return true if something was written on this output
 | 
						|
func (o *Output) Used() bool {
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
	return o.used
 | 
						|
}
 | 
						|
 | 
						|
// Add attaches a new destination to the Output. Any data subsequently written
 | 
						|
// to the output will be written to the new destination in addition to all the others.
 | 
						|
// This method is thread-safe.
 | 
						|
func (o *Output) Add(dst io.Writer) {
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
	o.dests = append(o.dests, dst)
 | 
						|
}
 | 
						|
 | 
						|
// Set closes and remove existing destination and then attaches a new destination to
 | 
						|
// the Output. Any data subsequently written to the output will be written to the new
 | 
						|
// destination in addition to all the others. This method is thread-safe.
 | 
						|
func (o *Output) Set(dst io.Writer) {
 | 
						|
	o.Close()
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
	o.dests = []io.Writer{dst}
 | 
						|
}
 | 
						|
 | 
						|
// AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
 | 
						|
// and returns its reading end for consumption by the caller.
 | 
						|
// This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
 | 
						|
// This method is thread-safe.
 | 
						|
func (o *Output) AddPipe() (io.Reader, error) {
 | 
						|
	r, w := io.Pipe()
 | 
						|
	o.Add(w)
 | 
						|
	return r, nil
 | 
						|
}
 | 
						|
 | 
						|
// AddTail starts a new goroutine which will read all subsequent data written to the output,
 | 
						|
// line by line, and append the last `n` lines to `dst`.
 | 
						|
func (o *Output) AddTail(dst *[]string, n int) error {
 | 
						|
	src, err := o.AddPipe()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	o.tasks.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer o.tasks.Done()
 | 
						|
		Tail(src, n, dst)
 | 
						|
	}()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// AddString starts a new goroutine which will read all subsequent data written to the output,
 | 
						|
// line by line, and store the last line into `dst`.
 | 
						|
func (o *Output) AddString(dst *string) error {
 | 
						|
	src, err := o.AddPipe()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	o.tasks.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer o.tasks.Done()
 | 
						|
		lines := make([]string, 0, 1)
 | 
						|
		Tail(src, 1, &lines)
 | 
						|
		if len(lines) == 0 {
 | 
						|
			*dst = ""
 | 
						|
		} else {
 | 
						|
			*dst = lines[0]
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Write writes the same data to all registered destinations.
 | 
						|
// This method is thread-safe.
 | 
						|
func (o *Output) Write(p []byte) (n int, err error) {
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
	o.used = true
 | 
						|
	var firstErr error
 | 
						|
	for _, dst := range o.dests {
 | 
						|
		_, err := dst.Write(p)
 | 
						|
		if err != nil && firstErr == nil {
 | 
						|
			firstErr = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return len(p), firstErr
 | 
						|
}
 | 
						|
 | 
						|
// Close unregisters all destinations and waits for all background
 | 
						|
// AddTail and AddString tasks to complete.
 | 
						|
// The Close method of each destination is called if it exists.
 | 
						|
func (o *Output) Close() error {
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
	var firstErr error
 | 
						|
	for _, dst := range o.dests {
 | 
						|
		if closer, ok := dst.(io.WriteCloser); ok {
 | 
						|
			err := closer.Close()
 | 
						|
			if err != nil && firstErr == nil {
 | 
						|
				firstErr = err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	o.tasks.Wait()
 | 
						|
	return firstErr
 | 
						|
}
 | 
						|
 | 
						|
type Input struct {
 | 
						|
	src io.Reader
 | 
						|
	sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// NewInput returns a new Input object with no source attached.
 | 
						|
// Reading to an empty Input will return io.EOF.
 | 
						|
func NewInput() *Input {
 | 
						|
	return &Input{}
 | 
						|
}
 | 
						|
 | 
						|
// Read reads from the input in a thread-safe way.
 | 
						|
func (i *Input) Read(p []byte) (n int, err error) {
 | 
						|
	i.Mutex.Lock()
 | 
						|
	defer i.Mutex.Unlock()
 | 
						|
	if i.src == nil {
 | 
						|
		return 0, io.EOF
 | 
						|
	}
 | 
						|
	return i.src.Read(p)
 | 
						|
}
 | 
						|
 | 
						|
// Closes the src
 | 
						|
// Not thread safe on purpose
 | 
						|
func (i *Input) Close() error {
 | 
						|
	if i.src != nil {
 | 
						|
		if closer, ok := i.src.(io.WriteCloser); ok {
 | 
						|
			return closer.Close()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Add attaches a new source to the input.
 | 
						|
// Add can only be called once per input. Subsequent calls will
 | 
						|
// return an error.
 | 
						|
func (i *Input) Add(src io.Reader) error {
 | 
						|
	i.Mutex.Lock()
 | 
						|
	defer i.Mutex.Unlock()
 | 
						|
	if i.src != nil {
 | 
						|
		return fmt.Errorf("Maximum number of sources reached: 1")
 | 
						|
	}
 | 
						|
	i.src = src
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Tail reads from `src` line per line, and returns the last `n` lines as an array.
 | 
						|
// A ring buffer is used to only store `n` lines at any time.
 | 
						|
func Tail(src io.Reader, n int, dst *[]string) {
 | 
						|
	scanner := bufio.NewScanner(src)
 | 
						|
	r := ring.New(n)
 | 
						|
	for scanner.Scan() {
 | 
						|
		if n == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		r.Value = scanner.Text()
 | 
						|
		r = r.Next()
 | 
						|
	}
 | 
						|
	r.Do(func(v interface{}) {
 | 
						|
		if v == nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		*dst = append(*dst, v.(string))
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// AddEnv starts a new goroutine which will decode all subsequent data
 | 
						|
// as a stream of json-encoded objects, and point `dst` to the last
 | 
						|
// decoded object.
 | 
						|
// The result `env` can be queried using the type-neutral Env interface.
 | 
						|
// It is not safe to query `env` until the Output is closed.
 | 
						|
func (o *Output) AddEnv() (dst *Env, err error) {
 | 
						|
	src, err := o.AddPipe()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	dst = &Env{}
 | 
						|
	o.tasks.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer o.tasks.Done()
 | 
						|
		decoder := NewDecoder(src)
 | 
						|
		for {
 | 
						|
			env, err := decoder.Decode()
 | 
						|
			if err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			*dst = *env
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return dst, nil
 | 
						|
}
 | 
						|
 | 
						|
func (o *Output) AddListTable() (dst *Table, err error) {
 | 
						|
	src, err := o.AddPipe()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	dst = NewTable("", 0)
 | 
						|
	o.tasks.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer o.tasks.Done()
 | 
						|
		content, err := ioutil.ReadAll(src)
 | 
						|
		if err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if _, err := dst.ReadListFrom(content); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return dst, nil
 | 
						|
}
 | 
						|
 | 
						|
func (o *Output) AddTable() (dst *Table, err error) {
 | 
						|
	src, err := o.AddPipe()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	dst = NewTable("", 0)
 | 
						|
	o.tasks.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer o.tasks.Done()
 | 
						|
		if _, err := dst.ReadFrom(src); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return dst, nil
 | 
						|
}
 |