1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #32949 from dnephin/refactor-streamformatter-and-progress

Refacator pkg/streamformatter
This commit is contained in:
Aaron Lehmann 2017-05-03 09:05:26 -07:00 committed by GitHub
commit 08d7fad45d
15 changed files with 229 additions and 226 deletions

View file

@ -138,7 +138,6 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
output := ioutils.NewWriteFlusher(w)
defer output.Close()
sf := streamformatter.NewJSONStreamFormatter()
errf := func(err error) error {
if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 {
output.Write(notVerboseBuffer.Bytes())
@ -148,7 +147,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
if !output.Flushed() {
return err
}
_, err = w.Write(sf.FormatError(err))
_, err = w.Write(streamformatter.FormatError(err))
if err != nil {
logrus.Warnf("could not write error response: %v", err)
}
@ -166,25 +165,22 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
errors.New("squash is only supported with experimental mode"))
}
// Currently, only used if context is from a remote url.
// Look at code in DetectContextFromRemoteURL for more information.
createProgressReader := func(in io.ReadCloser) io.ReadCloser {
progressOutput := sf.NewProgressOutput(output, true)
if buildOptions.SuppressOutput {
progressOutput = sf.NewProgressOutput(notVerboseBuffer, true)
}
return progress.NewProgressReader(in, progressOutput, r.ContentLength, "Downloading context", buildOptions.RemoteContext)
}
out := io.Writer(output)
if buildOptions.SuppressOutput {
out = notVerboseBuffer
}
// Currently, only used if context is from a remote url.
// Look at code in DetectContextFromRemoteURL for more information.
createProgressReader := func(in io.ReadCloser) io.ReadCloser {
progressOutput := streamformatter.NewJSONProgressOutput(out, true)
return progress.NewProgressReader(in, progressOutput, r.ContentLength, "Downloading context", buildOptions.RemoteContext)
}
imgID, err := br.backend.Build(ctx, backend.BuildConfig{
Source: r.Body,
Options: buildOptions,
ProgressWriter: buildProgressWriter(out, sf, createProgressReader),
ProgressWriter: buildProgressWriter(out, createProgressReader),
})
if err != nil {
return errf(err)
@ -193,8 +189,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
// Everything worked so if -q was provided the output from the daemon
// should be just the image ID and we'll print that to stdout.
if buildOptions.SuppressOutput {
stdout := &streamformatter.StdoutFormatter{Writer: output, StreamFormatter: sf}
fmt.Fprintln(stdout, imgID)
fmt.Fprintln(streamformatter.NewStdoutWriter(output), imgID)
}
return nil
}
@ -226,15 +221,13 @@ func (s *syncWriter) Write(b []byte) (count int, err error) {
return
}
func buildProgressWriter(out io.Writer, sf *streamformatter.StreamFormatter, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter {
func buildProgressWriter(out io.Writer, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter {
out = &syncWriter{w: out}
stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf}
stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf}
return backend.ProgressWriter{
Output: out,
StdoutFormatter: stdout,
StderrFormatter: stderr,
StdoutFormatter: streamformatter.NewStdoutWriter(out),
StderrFormatter: streamformatter.NewStderrWriter(out),
ProgressReaderFunc: createProgressReader,
}
}

View file

@ -118,8 +118,7 @@ func (s *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrite
if !output.Flushed() {
return err
}
sf := streamformatter.NewJSONStreamFormatter()
output.Write(sf.FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
@ -164,8 +163,7 @@ func (s *imageRouter) postImagesPush(ctx context.Context, w http.ResponseWriter,
if !output.Flushed() {
return err
}
sf := streamformatter.NewJSONStreamFormatter()
output.Write(sf.FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
}
@ -190,8 +188,7 @@ func (s *imageRouter) getImagesGet(ctx context.Context, w http.ResponseWriter, r
if !output.Flushed() {
return err
}
sf := streamformatter.NewJSONStreamFormatter()
output.Write(sf.FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
}
@ -207,7 +204,7 @@ func (s *imageRouter) postImagesLoad(ctx context.Context, w http.ResponseWriter,
output := ioutils.NewWriteFlusher(w)
defer output.Close()
if err := s.backend.LoadImage(r.Body, output, quiet); err != nil {
output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
}

View file

@ -121,7 +121,7 @@ func (pr *pluginRouter) upgradePlugin(ctx context.Context, w http.ResponseWriter
if !output.Flushed() {
return err
}
output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
@ -160,7 +160,7 @@ func (pr *pluginRouter) pullPlugin(ctx context.Context, w http.ResponseWriter, r
if !output.Flushed() {
return err
}
output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
@ -268,7 +268,7 @@ func (pr *pluginRouter) pushPlugin(ctx context.Context, w http.ResponseWriter, r
if !output.Flushed() {
return err
}
output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
output.Write(streamformatter.FormatError(err))
}
return nil
}

View file

@ -4,14 +4,13 @@ import (
"io"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/streamformatter"
)
// ProgressWriter is a data object to transport progress streams to the client
type ProgressWriter struct {
Output io.Writer
StdoutFormatter *streamformatter.StdoutFormatter
StderrFormatter *streamformatter.StderrFormatter
StdoutFormatter io.Writer
StderrFormatter io.Writer
ProgressReaderFunc func(io.ReadCloser) io.ReadCloser
}

View file

@ -287,8 +287,7 @@ func (b *Builder) download(srcURL string) (remote builder.Source, p string, err
return
}
stdoutFormatter := b.Stdout.(*streamformatter.StdoutFormatter)
progressOutput := stdoutFormatter.StreamFormatter.NewProgressOutput(stdoutFormatter.Writer, true)
progressOutput := streamformatter.NewJSONProgressOutput(b.Output, true)
progressReader := progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Downloading")
// Download and dump result to tmp file
// TODO: add filehash directly

View file

@ -269,7 +269,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
}
// Setup an upload progress bar
progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(progBuff, true)
progressOutput := streamformatter.NewProgressOutput(progBuff)
if !dockerCli.Out().IsTerminal() {
progressOutput = &lastProgressOutput{output: progressOutput}
}

View file

@ -154,7 +154,7 @@ func GetContextFromURL(out io.Writer, remoteURL, dockerfileName string) (io.Read
if err != nil {
return nil, "", errors.Errorf("unable to download remote context %s: %v", remoteURL, err)
}
progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(out, true)
progressOutput := streamformatter.NewProgressOutput(out)
// Pass the response body through a progress reader.
progReader := progress.NewProgressReader(response.Body, progressOutput, response.ContentLength, "", fmt.Sprintf("Downloading build context from remote url: %s", remoteURL))

View file

@ -62,7 +62,7 @@ func stateToProgress(state swarm.TaskState, rollback bool) int64 {
func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
defer progressWriter.Close()
progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false)
progressOut := streamformatter.NewJSONProgressOutput(progressWriter, false)
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)

View file

@ -28,7 +28,6 @@ import (
// the repo and tag arguments, respectively.
func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error {
var (
sf = streamformatter.NewJSONStreamFormatter()
rc io.ReadCloser
resp *http.Response
newRef reference.Named
@ -72,8 +71,8 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string
if err != nil {
return err
}
outStream.Write(sf.FormatStatus("", "Downloading from %s", u))
progressOutput := sf.NewProgressOutput(outStream, true)
outStream.Write(streamformatter.FormatStatus("", "Downloading from %s", u))
progressOutput := streamformatter.NewJSONProgressOutput(outStream, true)
rc = progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Importing")
}
@ -129,6 +128,6 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string
}
daemon.LogImageEvent(id.String(), id.String(), "import")
outStream.Write(sf.FormatStatus("", id.String()))
outStream.Write(streamformatter.FormatStatus("", id.String()))
return nil
}

View file

@ -14,7 +14,7 @@ import (
// WriteDistributionProgress is a helper for writing progress from chan to JSON
// stream with an optional cancel function.
func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) {
progressOutput := streamformatter.NewJSONStreamFormatter().NewProgressOutput(outStream, false)
progressOutput := streamformatter.NewJSONProgressOutput(outStream, false)
operationCancelled := false
for prog := range progressChan {

View file

@ -26,14 +26,11 @@ import (
)
func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
var (
sf = streamformatter.NewJSONStreamFormatter()
progressOutput progress.Output
)
var progressOutput progress.Output
if !quiet {
progressOutput = sf.NewProgressOutput(outStream, false)
progressOutput = streamformatter.NewJSONProgressOutput(outStream, false)
}
outStream = &streamformatter.StdoutFormatter{Writer: outStream, StreamFormatter: streamformatter.NewJSONStreamFormatter()}
outStream = streamformatter.NewStdoutWriter(outStream)
tmpDir, err := ioutil.TempDir("", "docker-import-")
if err != nil {

View file

@ -10,91 +10,76 @@ import (
"github.com/docker/docker/pkg/progress"
)
// StreamFormatter formats a stream, optionally using JSON.
type StreamFormatter struct {
json bool
}
// NewStreamFormatter returns a simple StreamFormatter
func NewStreamFormatter() *StreamFormatter {
return &StreamFormatter{}
}
// NewJSONStreamFormatter returns a StreamFormatter configured to stream json
func NewJSONStreamFormatter() *StreamFormatter {
return &StreamFormatter{true}
}
const streamNewline = "\r\n"
var streamNewlineBytes = []byte(streamNewline)
type jsonProgressFormatter struct{}
// FormatStream formats the specified stream.
func (sf *StreamFormatter) FormatStream(str string) []byte {
if sf.json {
b, err := json.Marshal(&jsonmessage.JSONMessage{Stream: str})
if err != nil {
return sf.FormatError(err)
}
return append(b, streamNewlineBytes...)
}
return []byte(str + "\r")
func appendNewline(source []byte) []byte {
return append(source, []byte(streamNewline)...)
}
// FormatStatus formats the specified objects according to the specified format (and id).
func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
func FormatStatus(id, format string, a ...interface{}) []byte {
str := fmt.Sprintf(format, a...)
if sf.json {
b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str})
if err != nil {
return sf.FormatError(err)
}
return append(b, streamNewlineBytes...)
b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str})
if err != nil {
return FormatError(err)
}
return []byte(str + streamNewline)
return appendNewline(b)
}
// FormatError formats the specified error.
func (sf *StreamFormatter) FormatError(err error) []byte {
if sf.json {
jsonError, ok := err.(*jsonmessage.JSONError)
if !ok {
jsonError = &jsonmessage.JSONError{Message: err.Error()}
}
if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
return append(b, streamNewlineBytes...)
}
return []byte("{\"error\":\"format error\"}" + streamNewline)
// FormatError formats the error as a JSON object
func FormatError(err error) []byte {
jsonError, ok := err.(*jsonmessage.JSONError)
if !ok {
jsonError = &jsonmessage.JSONError{Message: err.Error()}
}
return []byte("Error: " + err.Error() + streamNewline)
if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
return appendNewline(b)
}
return []byte(`{"error":"format error"}` + streamNewline)
}
// FormatProgress formats the progress information for a specified action.
func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return FormatStatus(id, format, a...)
}
// formatProgress formats the progress information for a specified action.
func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
if progress == nil {
progress = &jsonmessage.JSONProgress{}
}
if sf.json {
var auxJSON *json.RawMessage
if aux != nil {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return nil
}
auxJSON = new(json.RawMessage)
*auxJSON = auxJSONBytes
}
b, err := json.Marshal(&jsonmessage.JSONMessage{
Status: action,
ProgressMessage: progress.String(),
Progress: progress,
ID: id,
Aux: auxJSON,
})
var auxJSON *json.RawMessage
if aux != nil {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return nil
}
return append(b, streamNewlineBytes...)
auxJSON = new(json.RawMessage)
*auxJSON = auxJSONBytes
}
b, err := json.Marshal(&jsonmessage.JSONMessage{
Status: action,
ProgressMessage: progress.String(),
Progress: progress,
ID: id,
Aux: auxJSON,
})
if err != nil {
return nil
}
return appendNewline(b)
}
type rawProgressFormatter struct{}
func (sf *rawProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return []byte(fmt.Sprintf(format, a...) + streamNewline)
}
func (sf *rawProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
if progress == nil {
progress = &jsonmessage.JSONProgress{}
}
endl := "\r"
if progress.String() == "" {
@ -105,16 +90,23 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{
sf: sf,
out: out,
newLines: newLines,
}
func NewProgressOutput(out io.Writer) progress.Output {
return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true}
}
// NewJSONProgressOutput returns a progress.Output that that formats output
// using JSON objects
func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
}
type formatProgress interface {
formatStatus(id, format string, a ...interface{}) []byte
formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte
}
type progressOutput struct {
sf *StreamFormatter
sf formatProgress
out io.Writer
newLines bool
}
@ -123,10 +115,10 @@ type progressOutput struct {
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.FormatStatus(prog.ID, prog.Message)
formatted = out.sf.formatStatus(prog.ID, prog.Message)
} else {
jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total, HideCounts: prog.HideCounts}
formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
}
_, err := out.out.Write(formatted)
if err != nil {
@ -134,39 +126,9 @@ func (out *progressOutput) WriteProgress(prog progress.Progress) error {
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.FormatStatus("", ""))
_, err = out.out.Write(out.sf.formatStatus("", ""))
return err
}
return nil
}
// StdoutFormatter is a streamFormatter that writes to the standard output.
type StdoutFormatter struct {
io.Writer
*StreamFormatter
}
func (sf *StdoutFormatter) Write(buf []byte) (int, error) {
formattedBuf := sf.StreamFormatter.FormatStream(string(buf))
n, err := sf.Writer.Write(formattedBuf)
if n != len(formattedBuf) {
return n, io.ErrShortWrite
}
return len(buf), err
}
// StderrFormatter is a streamFormatter that writes to the standard error.
type StderrFormatter struct {
io.Writer
*StreamFormatter
}
func (sf *StderrFormatter) Write(buf []byte) (int, error) {
formattedBuf := sf.StreamFormatter.FormatStream("\033[91m" + string(buf) + "\033[0m")
n, err := sf.Writer.Write(formattedBuf)
if n != len(formattedBuf) {
return n, io.ErrShortWrite
}
return len(buf), err
}

View file

@ -3,88 +3,65 @@ package streamformatter
import (
"encoding/json"
"errors"
"reflect"
"strings"
"testing"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFormatStream(t *testing.T) {
sf := NewStreamFormatter()
res := sf.FormatStream("stream")
if string(res) != "stream"+"\r" {
t.Fatalf("%q", res)
}
func TestRawProgressFormatterFormatStatus(t *testing.T) {
sf := rawProgressFormatter{}
res := sf.formatStatus("ID", "%s%d", "a", 1)
assert.Equal(t, "a1\r\n", string(res))
}
func TestFormatJSONStatus(t *testing.T) {
sf := NewStreamFormatter()
res := sf.FormatStatus("ID", "%s%d", "a", 1)
if string(res) != "a1\r\n" {
t.Fatalf("%q", res)
}
}
func TestFormatSimpleError(t *testing.T) {
sf := NewStreamFormatter()
res := sf.FormatError(errors.New("Error for formatter"))
if string(res) != "Error: Error for formatter\r\n" {
t.Fatalf("%q", res)
}
}
func TestJSONFormatStream(t *testing.T) {
sf := NewJSONStreamFormatter()
res := sf.FormatStream("stream")
if string(res) != `{"stream":"stream"}`+"\r\n" {
t.Fatalf("%q", res)
}
}
func TestJSONFormatStatus(t *testing.T) {
sf := NewJSONStreamFormatter()
res := sf.FormatStatus("ID", "%s%d", "a", 1)
if string(res) != `{"status":"a1","id":"ID"}`+"\r\n" {
t.Fatalf("%q", res)
}
}
func TestJSONFormatSimpleError(t *testing.T) {
sf := NewJSONStreamFormatter()
res := sf.FormatError(errors.New("Error for formatter"))
if string(res) != `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}`+"\r\n" {
t.Fatalf("%q", res)
}
}
func TestJSONFormatJSONError(t *testing.T) {
sf := NewJSONStreamFormatter()
err := &jsonmessage.JSONError{Code: 50, Message: "Json error"}
res := sf.FormatError(err)
if string(res) != `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}`+"\r\n" {
t.Fatalf("%q", res)
}
}
func TestJSONFormatProgress(t *testing.T) {
sf := NewJSONStreamFormatter()
func TestRawProgressFormatterFormatProgress(t *testing.T) {
sf := rawProgressFormatter{}
progress := &jsonmessage.JSONProgress{
Current: 15,
Total: 30,
Start: 1,
}
res := sf.FormatProgress("id", "action", progress, nil)
res := sf.formatProgress("id", "action", progress, nil)
out := string(res)
assert.True(t, strings.HasPrefix(out, "action [===="))
assert.Contains(t, out, "15B/30B")
assert.True(t, strings.HasSuffix(out, "\r"))
}
func TestFormatStatus(t *testing.T) {
res := FormatStatus("ID", "%s%d", "a", 1)
expected := `{"status":"a1","id":"ID"}` + streamNewline
assert.Equal(t, expected, string(res))
}
func TestFormatError(t *testing.T) {
res := FormatError(errors.New("Error for formatter"))
expected := `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}` + "\r\n"
assert.Equal(t, expected, string(res))
}
func TestFormatJSONError(t *testing.T) {
err := &jsonmessage.JSONError{Code: 50, Message: "Json error"}
res := FormatError(err)
expected := `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}` + streamNewline
assert.Equal(t, expected, string(res))
}
func TestJsonProgressFormatterFormatProgress(t *testing.T) {
sf := &jsonProgressFormatter{}
progress := &jsonmessage.JSONProgress{
Current: 15,
Total: 30,
Start: 1,
}
res := sf.formatProgress("id", "action", progress, nil)
msg := &jsonmessage.JSONMessage{}
if err := json.Unmarshal(res, msg); err != nil {
t.Fatal(err)
}
if msg.ID != "id" {
t.Fatalf("ID must be 'id', got: %s", msg.ID)
}
if msg.Status != "action" {
t.Fatalf("Status must be 'action', got: %s", msg.Status)
}
require.NoError(t, json.Unmarshal(res, msg))
assert.Equal(t, "id", msg.ID)
assert.Equal(t, "action", msg.Status)
// The progress will always be in the format of:
// [=========================> ] 15B/30B 412910h51m30s
@ -102,7 +79,5 @@ func TestJSONFormatProgress(t *testing.T) {
expectedProgress, expectedProgressShort, msg.ProgressMessage)
}
if !reflect.DeepEqual(msg.Progress, progress) {
t.Fatal("Original progress not equals progress from FormatProgress")
}
assert.Equal(t, progress, msg.Progress)
}

View file

@ -0,0 +1,47 @@
package streamformatter
import (
"encoding/json"
"io"
"github.com/docker/docker/pkg/jsonmessage"
)
type streamWriter struct {
io.Writer
lineFormat func([]byte) string
}
func (sw *streamWriter) Write(buf []byte) (int, error) {
formattedBuf := sw.format(buf)
n, err := sw.Writer.Write(formattedBuf)
if n != len(formattedBuf) {
return n, io.ErrShortWrite
}
return len(buf), err
}
func (sw *streamWriter) format(buf []byte) []byte {
msg := &jsonmessage.JSONMessage{Stream: sw.lineFormat(buf)}
b, err := json.Marshal(msg)
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// NewStdoutWriter returns a writer which formats the output as json message
// representing stdout lines
func NewStdoutWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return string(buf)
}}
}
// NewStderrWriter returns a writer which formats the output as json message
// representing stderr lines
func NewStderrWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return "\033[91m" + string(buf) + "\033[0m"
}}
}

View file

@ -0,0 +1,35 @@
package streamformatter
import (
"testing"
"bytes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStreamWriterStdout(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStdoutWriter(buffer)
size, err := sw.Write([]byte(content))
require.NoError(t, err)
assert.Equal(t, len(content), size)
expected := `{"stream":"content"}` + streamNewline
assert.Equal(t, expected, buffer.String())
}
func TestStreamWriterStderr(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStderrWriter(buffer)
size, err := sw.Write([]byte(content))
require.NoError(t, err)
assert.Equal(t, len(content), size)
expected := `{"stream":"\u001b[91mcontent\u001b[0m"}` + streamNewline
assert.Equal(t, expected, buffer.String())
}