diff --git a/engine/streams.go b/engine/streams.go index ff7049074a..ee26fe5f1e 100644 --- a/engine/streams.go +++ b/engine/streams.go @@ -164,3 +164,30 @@ func Tail(src io.Reader, n int, dst *[]string) { *dst = append(*dst, v.(string)) }) } + +// AddEnv starts a new goroutine which will decode all subsequent data +// as a stream of json-encoded objects, and point `dst` to the last +// decoded object. +// The result `env` can be queried using the type-neutral Env interface. +// It is not safe to query `env` until the Output is closed. +func (o *Output) AddEnv() (dst *Env, err error) { + src, err := o.AddPipe() + if err != nil { + return nil, err + } + dst = &Env{} + o.tasks.Add(1) + go func() { + defer o.tasks.Done() + decoder := NewDecoder(src) + for { + env, err := decoder.Decode() + if err != nil { + return + } + *dst= *env + } + }() + return dst, nil +} + diff --git a/engine/streams_test.go b/engine/streams_test.go index 108c9850a5..37720c61ea 100644 --- a/engine/streams_test.go +++ b/engine/streams_test.go @@ -72,6 +72,26 @@ func (w *sentinelWriteCloser) Close() error { return nil } +func TestOutputAddEnv(t *testing.T) { + input := "{\"foo\": \"bar\", \"answer_to_life_the_universe_and_everything\": 42}" + o := NewOutput() + result, err := o.AddEnv() + if err != nil { + t.Fatal(err) + } + o.Write([]byte(input)) + o.Close() + if v := result.Get("foo"); v != "bar" { + t.Errorf("Expected %v, got %v", "bar", v) + } + if v := result.GetInt("answer_to_life_the_universe_and_everything"); v != 42 { + t.Errorf("Expected %v, got %v", 42, v) + } + if v := result.Get("this-value-doesnt-exist"); v != "" { + t.Errorf("Expected %v, got %v", "", v) + } +} + func TestOutputAddClose(t *testing.T) { o := NewOutput() var s sentinelWriteCloser