1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

daemon: support other containerd runtimes (MVP)

Contrary to popular belief, the OCI Runtime specification does not
specify the command-line API for runtimes. Looking at containerd's
architecture from the lens of the OCI Runtime spec, the _shim_ is the
OCI Runtime and runC is "just" an implementation detail of the
io.containerd.runc.v2 runtime. When one configures a non-default runtime
in Docker, what they're really doing is instructing Docker to create
containers using the io.containerd.runc.v2 runtime with a configuration
option telling the runtime that the runC binary is at some non-default
path. Consequently, only OCI runtimes which are compatible with the
io.containerd.runc.v2 shim, such as crun, can be used in this manner.
Other OCI runtimes, including kata-containers v2, come with their own
containerd shim and are not compatible with io.containerd.runc.v2.
As Docker has not historically provided a way to select a non-default
runtime which requires its own shim, runtimes such as kata-containers v2
could not be used with Docker.

Allow other containerd shims to be used with Docker; no daemon
configuration required. If the daemon is instructed to create a
container with a runtime name which does not match any of the configured
or stock runtimes, it passes the name along to containerd verbatim. A
user can start a container with the kata-containers runtime, for
example, simply by calling

    docker run --runtime io.containerd.kata.v2

Runtime names which containerd would interpret as a path to an arbitrary
binary are disallowed. While handy for development and testing it is not
strictly necessary and would allow anyone with Engine API access to
trivially execute any binary on the host as root, so we have decided it
would be safest for our users if it was not allowed.

It is not yet possible to set an alternative containerd shim as the
default runtime; it can only be configured per-container.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit 547da0d575)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Cory Snider 2022-07-20 16:12:01 -04:00 committed by Sebastiaan van Stijn
parent ad0ee82f0d
commit 6de52a29a8
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
29 changed files with 10504 additions and 16 deletions

View file

@ -707,8 +707,8 @@ func verifyPlatformContainerSettings(daemon *Daemon, hostConfig *containertypes.
hostConfig.Runtime = daemon.configStore.GetDefaultRuntimeName() hostConfig.Runtime = daemon.configStore.GetDefaultRuntimeName()
} }
if rt := daemon.configStore.GetRuntime(hostConfig.Runtime); rt == nil { if _, err := daemon.getRuntime(hostConfig.Runtime); err != nil {
return warnings, fmt.Errorf("Unknown runtime specified %s", hostConfig.Runtime) return warnings, err
} }
parser := volumemounts.NewParser() parser := volumemounts.NewParser()

View file

@ -11,6 +11,7 @@ import (
"strings" "strings"
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/config"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
@ -116,7 +117,10 @@ func (daemon *Daemon) rewriteRuntimePath(name, p string, args []string) (string,
func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) { func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
rt := daemon.configStore.GetRuntime(name) rt := daemon.configStore.GetRuntime(name)
if rt == nil { if rt == nil {
return nil, errdefs.InvalidParameter(errors.Errorf("runtime not found in config: %s", name)) if !isPermissibleC8dRuntimeName(name) {
return nil, errdefs.InvalidParameter(errors.Errorf("unknown or invalid runtime name: %s", name))
}
return &types.Runtime{Shim: &types.ShimConfig{Binary: name}}, nil
} }
if len(rt.Args) > 0 { if len(rt.Args) > 0 {
@ -134,3 +138,37 @@ func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
return rt, nil return rt, nil
} }
// isPermissibleC8dRuntimeName tests whether name is safe to pass into
// containerd as a runtime name, and whether the name is well-formed.
// It does not check if the runtime is installed.
//
// A runtime name containing slash characters is interpreted by containerd as
// the path to a runtime binary. If we allowed this, anyone with Engine API
// access could get containerd to execute an arbitrary binary as root. Although
// Engine API access is already equivalent to root on the host, the runtime name
// has not historically been a vector to run arbitrary code as root so users are
// not expecting it to become one.
//
// This restriction is not configurable. There are viable workarounds for
// legitimate use cases: administrators and runtime developers can make runtimes
// available for use with Docker by installing them onto PATH following the
// [binary naming convention] for containerd Runtime v2.
//
// [binary naming convention]: https://github.com/containerd/containerd/blob/main/runtime/v2/README.md#binary-naming
func isPermissibleC8dRuntimeName(name string) bool {
// containerd uses a rather permissive test to validate runtime names:
//
// - Any name for which filepath.IsAbs(name) is interpreted as the absolute
// path to a shim binary. We want to block this behaviour.
// - Any name which contains at least one '.' character and no '/' characters
// and does not begin with a '.' character is a valid runtime name. The shim
// binary name is derived from the final two components of the name and
// searched for on the PATH. The name "a.." is technically valid per
// containerd's implementation: it would resolve to a binary named
// "containerd-shim---".
//
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/manager.go#L297-L317
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/shim/util.go#L83-L93
return !filepath.IsAbs(name) && !strings.ContainsRune(name, '/') && shim.BinaryName(name) != ""
}

107
daemon/runtime_unix_test.go Normal file
View file

@ -0,0 +1,107 @@
//go:build !windows
// +build !windows
package daemon
import (
"os"
"path/filepath"
"testing"
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/errdefs"
)
func TestGetRuntime(t *testing.T) {
// Configured runtimes can have any arbitrary name, including names
// which would not be allowed as implicit runtime names. Explicit takes
// precedence over implicit.
const configuredRtName = "my/custom.shim.v1"
configuredRuntime := types.Runtime{Path: "/bin/true"}
d := &Daemon{configStore: config.New()}
d.configStore.Root = t.TempDir()
assert.Assert(t, os.Mkdir(filepath.Join(d.configStore.Root, "runtimes"), 0700))
d.configStore.Runtimes = map[string]types.Runtime{
configuredRtName: configuredRuntime,
}
configureRuntimes(d.configStore)
assert.Assert(t, d.loadRuntimes())
stockRuntime, ok := d.configStore.Runtimes[config.StockRuntimeName]
assert.Assert(t, ok, "stock runtime could not be found (test needs to be updated)")
configdOpts := *stockRuntime.Shim.Opts.(*v2runcoptions.Options)
configdOpts.BinaryName = configuredRuntime.Path
wantConfigdRuntime := configuredRuntime
wantConfigdRuntime.Shim = &types.ShimConfig{
Binary: stockRuntime.Shim.Binary,
Opts: &configdOpts,
}
for _, tt := range []struct {
name, runtime string
want *types.Runtime
}{
{
name: "StockRuntime",
runtime: config.StockRuntimeName,
want: &stockRuntime,
},
{
name: "ShimName",
runtime: "io.containerd.my-shim.v42",
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim.v42"}},
},
{
// containerd is pretty loose about the format of runtime names. Perhaps too
// loose. The only requirements are that the name contain a dot and (depending
// on the containerd version) not start with a dot. It does not enforce any
// particular format of the dot-delimited components of the name.
name: "VersionlessShimName",
runtime: "io.containerd.my-shim",
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim"}},
},
{
name: "IllformedShimName",
runtime: "myshim",
},
{
name: "EmptyString",
runtime: "",
},
{
name: "PathToShim",
runtime: "/path/to/runc",
},
{
name: "PathToShimName",
runtime: "/path/to/io.containerd.runc.v2",
},
{
name: "RelPathToShim",
runtime: "my/io.containerd.runc.v2",
},
{
name: "ConfiguredRuntime",
runtime: configuredRtName,
want: &wantConfigdRuntime,
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
got, err := d.getRuntime(tt.runtime)
assert.Check(t, is.DeepEqual(got, tt.want))
if tt.want != nil {
assert.Check(t, err)
} else {
assert.Check(t, errdefs.IsInvalidParameter(err))
}
})
}
}

View file

@ -39,6 +39,7 @@ import (
"github.com/moby/sys/mount" "github.com/moby/sys/mount"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"gotest.tools/v3/icmd" "gotest.tools/v3/icmd"
"gotest.tools/v3/poll" "gotest.tools/v3/poll"
) )
@ -2384,7 +2385,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
// Run with "vm" // Run with "vm"
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory")) assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
// Reset config to only have the default // Reset config to only have the default
config = ` config = `
{ {
@ -2404,11 +2405,11 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
// Run with "oci" // Run with "oci"
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci")) assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
// Start previously created container with oci // Start previously created container with oci
out, err = s.d.Cmd("start", "oci-runtime-ls") out, err = s.d.Cmd("start", "oci-runtime-ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci")) assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
// Check that we can't override the default runtime // Check that we can't override the default runtime
config = ` config = `
{ {
@ -2426,7 +2427,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
content, err := s.d.ReadLogFile() content, err := s.d.ReadLogFile()
assert.NilError(c, err) assert.NilError(c, err)
assert.Assert(c, strings.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`)) assert.Assert(c, is.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`))
// Check that we can select a default runtime // Check that we can select a default runtime
config = ` config = `
{ {
@ -2451,7 +2452,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
out, err = s.d.Cmd("run", "--rm", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory")) assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
// Run with default runtime explicitly // Run with default runtime explicitly
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
assert.NilError(c, err, out) assert.NilError(c, err, out)
@ -2475,7 +2476,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
// Run with "vm" // Run with "vm"
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory")) assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
// Start a daemon without any extra runtimes // Start a daemon without any extra runtimes
s.d.Stop(c) s.d.Stop(c)
s.d.StartWithBusybox(c) s.d.StartWithBusybox(c)
@ -2487,25 +2488,25 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
// Run with "oci" // Run with "oci"
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci")) assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
// Start previously created container with oci // Start previously created container with oci
out, err = s.d.Cmd("start", "oci-runtime-ls") out, err = s.d.Cmd("start", "oci-runtime-ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci")) assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
// Check that we can't override the default runtime // Check that we can't override the default runtime
s.d.Stop(c) s.d.Stop(c)
assert.Assert(c, s.d.StartWithError("--add-runtime", "runc=my-runc") != nil) assert.Assert(c, s.d.StartWithError("--add-runtime", "runc=my-runc") != nil)
content, err := s.d.ReadLogFile() content, err := s.d.ReadLogFile()
assert.NilError(c, err) assert.NilError(c, err)
assert.Assert(c, strings.Contains(string(content), `runtime name 'runc' is reserved`)) assert.Assert(c, is.Contains(string(content), `runtime name 'runc' is reserved`))
// Check that we can select a default runtime // Check that we can select a default runtime
s.d.Stop(c) s.d.Stop(c)
s.d.StartWithBusybox(c, "--default-runtime=vm", "--add-runtime", "oci=runc", "--add-runtime", "vm=/usr/local/bin/vm-manager") s.d.StartWithBusybox(c, "--default-runtime=vm", "--add-runtime", "oci=runc", "--add-runtime", "vm=/usr/local/bin/vm-manager")
out, err = s.d.Cmd("run", "--rm", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
assert.ErrorContains(c, err, "", out) assert.ErrorContains(c, err, "", out)
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory")) assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
// Run with default runtime explicitly // Run with default runtime explicitly
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls") out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
assert.NilError(c, err, out) assert.NilError(c, err, out)

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"io" "io"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
@ -15,7 +16,9 @@ import (
"github.com/docker/docker/api/types/versions" "github.com/docker/docker/api/types/versions"
"github.com/docker/docker/integration/internal/container" "github.com/docker/docker/integration/internal/container"
net "github.com/docker/docker/integration/internal/network" net "github.com/docker/docker/integration/internal/network"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/system"
"github.com/docker/docker/testutil/daemon"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp" is "gotest.tools/v3/assert/cmp"
@ -214,3 +217,57 @@ func TestRunConsoleSize(t *testing.T) {
assert.Equal(t, strings.TrimSpace(b.String()), "123 57") assert.Equal(t, strings.TrimSpace(b.String()), "123 57")
} }
func TestRunWithAlternativeContainerdShim(t *testing.T) {
skip.If(t, testEnv.IsRemoteDaemon)
skip.If(t, testEnv.DaemonInfo.OSType != "linux")
realShimPath, err := exec.LookPath("containerd-shim-runc-v2")
assert.Assert(t, err)
realShimPath, err = filepath.Abs(realShimPath)
assert.Assert(t, err)
// t.TempDir() can't be used here as the temporary directory returned by
// that function cannot be accessed by the fake-root user for rootless
// Docker. It creates a nested hierarchy of directories where the
// outermost has permission 0700.
shimDir, err := os.MkdirTemp("", t.Name())
assert.Assert(t, err)
t.Cleanup(func() {
if err := os.RemoveAll(shimDir); err != nil {
t.Errorf("shimDir RemoveAll cleanup: %v", err)
}
})
assert.Assert(t, os.Chmod(shimDir, 0777))
shimDir, err = filepath.Abs(shimDir)
assert.Assert(t, err)
assert.Assert(t, os.Symlink(realShimPath, filepath.Join(shimDir, "containerd-shim-realfake-v42")))
d := daemon.New(t,
daemon.WithEnvVars("PATH="+shimDir+":"+os.Getenv("PATH")),
daemon.WithContainerdSocket(""), // A new containerd instance needs to be started which inherits the PATH env var defined above.
)
d.StartWithBusybox(t)
defer d.Stop(t)
client := d.NewClientT(t)
ctx := context.Background()
cID := container.Run(ctx, t, client,
container.WithImage("busybox"),
container.WithCmd("sh", "-c", `echo 'Hello, world!'`),
container.WithRuntime("io.containerd.realfake.v42"),
)
poll.WaitOn(t, container.IsStopped(ctx, client, cID), poll.WithDelay(100*time.Millisecond))
out, err := client.ContainerLogs(ctx, cID, types.ContainerLogsOptions{ShowStdout: true})
assert.NilError(t, err)
defer out.Close()
var b bytes.Buffer
_, err = stdcopy.StdCopy(&b, io.Discard, out)
assert.NilError(t, err)
assert.Equal(t, strings.TrimSpace(b.String()), "Hello, world!")
}

View file

@ -234,3 +234,10 @@ func WithConsoleSize(width, height uint) func(*TestContainerConfig) {
c.HostConfig.ConsoleSize = [2]uint{height, width} c.HostConfig.ConsoleSize = [2]uint{height, width}
} }
} }
// WithRuntime sets the runtime to use to start the container
func WithRuntime(name string) func(*TestContainerConfig) {
return func(c *TestContainerConfig) {
c.HostConfig.Runtime = name
}
}

View file

@ -76,6 +76,7 @@ type Daemon struct {
log LogT log LogT
pidFile string pidFile string
args []string args []string
extraEnv []string
containerdSocket string containerdSocket string
rootlessUser *user.User rootlessUser *user.User
rootlessXDGRuntimeDir string rootlessXDGRuntimeDir string
@ -334,9 +335,10 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
dockerdBinary = "sudo" dockerdBinary = "sudo"
d.args = append(d.args, d.args = append(d.args,
"-u", d.rootlessUser.Username, "-u", d.rootlessUser.Username,
"-E", "XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir, "--preserve-env",
"-E", "HOME="+d.rootlessUser.HomeDir, "--preserve-env=PATH", // Pass through PATH, overriding secure_path.
"-E", "PATH="+os.Getenv("PATH"), "XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir,
"HOME="+d.rootlessUser.HomeDir,
"--", "--",
defaultDockerdRootlessBinary, defaultDockerdRootlessBinary,
) )
@ -392,6 +394,7 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
d.args = append(d.args, providedArgs...) d.args = append(d.args, providedArgs...)
d.cmd = exec.Command(dockerdBinary, d.args...) d.cmd = exec.Command(dockerdBinary, d.args...)
d.cmd.Env = append(os.Environ(), "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE=1") d.cmd.Env = append(os.Environ(), "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE=1")
d.cmd.Env = append(d.cmd.Env, d.extraEnv...)
d.cmd.Stdout = out d.cmd.Stdout = out
d.cmd.Stderr = out d.cmd.Stderr = out
d.logFile = out d.logFile = out

View file

@ -122,3 +122,10 @@ func WithOOMScoreAdjust(score int) Option {
d.OOMScoreAdjust = score d.OOMScoreAdjust = score
} }
} }
// WithEnvVars sets additional environment variables for the daemon
func WithEnvVars(vars ...string) Option {
return func(d *Daemon) {
d.extraEnv = append(d.extraEnv, vars...)
}
}

View file

@ -0,0 +1,18 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package events defines the ttrpc event service.
package events

View file

@ -0,0 +1,760 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
package events
import (
context "context"
fmt "fmt"
github_com_containerd_ttrpc "github.com/containerd/ttrpc"
github_com_containerd_typeurl "github.com/containerd/typeurl"
proto "github.com/gogo/protobuf/proto"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
types "github.com/gogo/protobuf/types"
io "io"
math "math"
math_bits "math/bits"
reflect "reflect"
strings "strings"
time "time"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ForwardRequest struct {
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ForwardRequest) Reset() { *m = ForwardRequest{} }
func (*ForwardRequest) ProtoMessage() {}
func (*ForwardRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{0}
}
func (m *ForwardRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ForwardRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ForwardRequest.Merge(m, src)
}
func (m *ForwardRequest) XXX_Size() int {
return m.Size()
}
func (m *ForwardRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ForwardRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo
type Envelope struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Envelope) Reset() { *m = Envelope{} }
func (*Envelope) ProtoMessage() {}
func (*Envelope) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{1}
}
func (m *Envelope) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Envelope.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Envelope) XXX_Merge(src proto.Message) {
xxx_messageInfo_Envelope.Merge(m, src)
}
func (m *Envelope) XXX_Size() int {
return m.Size()
}
func (m *Envelope) XXX_DiscardUnknown() {
xxx_messageInfo_Envelope.DiscardUnknown(m)
}
var xxx_messageInfo_Envelope proto.InternalMessageInfo
func init() {
proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest")
proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope")
}
func init() {
proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5)
}
var fileDescriptor_19f98672016720b5 = []byte{
// 396 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15,
0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e,
0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0,
0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc,
0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52,
0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc,
0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4,
0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6,
0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75,
0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74,
0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52,
0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43,
0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d,
0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8,
0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa,
0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0,
0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01,
0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7,
0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8,
0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08,
0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e,
0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde,
0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27,
0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00,
}
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *Envelope) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
// unhandled: timestamp
case "namespace":
return string(m.Namespace), len(m.Namespace) > 0
case "topic":
return string(m.Topic), len(m.Topic) > 0
case "event":
decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event)
if err != nil {
return "", false
}
adaptor, ok := decoded.(interface{ Field([]string) (string, bool) })
if !ok {
return "", false
}
return adaptor.Field(fieldpath[1:])
}
return "", false
}
func (m *ForwardRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ForwardRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Envelope != nil {
{
size, err := m.Envelope.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintEvents(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Envelope) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Event != nil {
{
size, err := m.Event.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintEvents(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
if len(m.Topic) > 0 {
i -= len(m.Topic)
copy(dAtA[i:], m.Topic)
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
i--
dAtA[i] = 0x1a
}
if len(m.Namespace) > 0 {
i -= len(m.Namespace)
copy(dAtA[i:], m.Namespace)
i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace)))
i--
dAtA[i] = 0x12
}
n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
if err3 != nil {
return 0, err3
}
i -= n3
i = encodeVarintEvents(dAtA, i, uint64(n3))
i--
dAtA[i] = 0xa
return len(dAtA) - i, nil
}
func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
offset -= sovEvents(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *ForwardRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Envelope != nil {
l = m.Envelope.Size()
n += 1 + l + sovEvents(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Envelope) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
n += 1 + l + sovEvents(uint64(l))
l = len(m.Namespace)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
}
l = len(m.Topic)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
}
if m.Event != nil {
l = m.Event.Size()
n += 1 + l + sovEvents(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovEvents(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozEvents(x uint64) (n int) {
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *ForwardRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&ForwardRequest{`,
`Envelope:` + strings.Replace(this.Envelope.String(), "Envelope", "Envelope", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func (this *Envelope) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&Envelope{`,
`Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func valueToStringEvents(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
type EventsService interface {
Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error)
}
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
"Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ForwardRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Forward(ctx, &req)
},
})
}
type eventsClient struct {
client *github_com_containerd_ttrpc.Client
}
func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
return &eventsClient{
client: client,
}
}
func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) {
var resp types.Empty
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (m *ForwardRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Envelope == nil {
m.Envelope = &Envelope{}
}
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Envelope) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Envelope: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Namespace = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Topic = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Event == nil {
m.Event = &types.Any{}
}
if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipEvents(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthEvents
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupEvents
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthEvents
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupEvents = fmt.Errorf("proto: unexpected end of group")
)

View file

@ -0,0 +1,48 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.services.events.ttrpc.v1;
import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto";
import weak "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
service Events {
// Forward sends an event that has already been packaged into an envelope
// with a timestamp and namespace.
//
// This is useful if earlier timestamping is required or when forwarding on
// behalf of another component, namespace or publisher.
rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
}
message ForwardRequest {
Envelope envelope = 1;
}
message Envelope {
option (containerd.plugin.fieldpath) = true;
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string namespace = 2;
string topic = 3;
google.protobuf.Any event = 4;
}

View file

@ -0,0 +1,109 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shutdown
import (
"context"
"errors"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// ErrShutdown is the error condition when a context has been fully shutdown
var ErrShutdown = errors.New("shutdown")
// Service is used to facilitate shutdown by through callback
// registration and shutdown initiation
type Service interface {
// Shutdown initiates shutdown
Shutdown()
// RegisterCallback registers functions to be called on shutdown and before
// the shutdown channel is closed. A callback error will propagate to the
// context error
RegisterCallback(func(context.Context) error)
}
// WithShutdown returns a context which is similar to a cancel context, but
// with callbacks which can propagate to the context error. Unlike a cancel
// context, the shutdown context cannot be canceled from the parent context.
// However, future child contexes will be canceled upon shutdown.
func WithShutdown(ctx context.Context) (context.Context, Service) {
ss := &shutdownService{
Context: ctx,
doneC: make(chan struct{}),
timeout: 30 * time.Second,
}
return ss, ss
}
type shutdownService struct {
context.Context
mu sync.Mutex
isShutdown bool
callbacks []func(context.Context) error
doneC chan struct{}
err error
timeout time.Duration
}
func (s *shutdownService) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
if s.isShutdown {
return
}
s.isShutdown = true
go func(callbacks []func(context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
grp, ctx := errgroup.WithContext(ctx)
for i := range callbacks {
fn := callbacks[i]
grp.Go(func() error { return fn(ctx) })
}
err := grp.Wait()
if err == nil {
err = ErrShutdown
}
s.mu.Lock()
s.err = err
close(s.doneC)
s.mu.Unlock()
}(s.callbacks)
}
func (s *shutdownService) Done() <-chan struct{} {
return s.doneC
}
func (s *shutdownService) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}
func (s *shutdownService) RegisterCallback(fn func(context.Context) error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.callbacks == nil {
s.callbacks = []func(context.Context) error{}
}
s.callbacks = append(s.callbacks, fn)
}

View file

@ -0,0 +1,120 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpcutil
import (
"errors"
"fmt"
"sync"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/ttrpc"
)
const ttrpcDialTimeout = 5 * time.Second
type ttrpcConnector func() (*ttrpc.Client, error)
// Client is the client to interact with TTRPC part of containerd server (plugins, events)
type Client struct {
mu sync.Mutex
connector ttrpcConnector
client *ttrpc.Client
closed bool
}
// NewClient returns a new containerd TTRPC client that is connected to the containerd instance provided by address
func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
connector := func() (*ttrpc.Client, error) {
conn, err := dialer.Dialer(address, ttrpcDialTimeout)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
client := ttrpc.NewClient(conn, opts...)
return client, nil
}
return &Client{
connector: connector,
}, nil
}
// Reconnect re-establishes the TTRPC connection to the containerd daemon
func (c *Client) Reconnect() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.connector == nil {
return errors.New("unable to reconnect to containerd, no connector available")
}
if c.closed {
return errors.New("client is closed")
}
if c.client != nil {
if err := c.client.Close(); err != nil {
return err
}
}
client, err := c.connector()
if err != nil {
return err
}
c.client = client
return nil
}
// EventsService creates an EventsService client
func (c *Client) EventsService() (v1.EventsService, error) {
client, err := c.Client()
if err != nil {
return nil, err
}
return v1.NewEventsClient(client), nil
}
// Client returns the underlying TTRPC client object
func (c *Client) Client() (*ttrpc.Client, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.client == nil {
client, err := c.connector()
if err != nil {
return nil, err
}
c.client = client
}
return c.client, nil
}
// Close closes the clients TTRPC connection to containerd
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
if c.client != nil {
return c.client.Close()
}
return nil
}

View file

@ -0,0 +1,169 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"sync"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/ttrpcutil"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
)
const (
queueSize = 2048
maxRequeue = 5
)
type item struct {
ev *v1.Envelope
ctx context.Context
count int
}
// NewPublisher creates a new remote events publisher
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
client, err := ttrpcutil.NewClient(address)
if err != nil {
return nil, err
}
l := &RemoteEventsPublisher{
client: client,
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
}
go l.processQueue()
return l, nil
}
// RemoteEventsPublisher forwards events to a ttrpc server
type RemoteEventsPublisher struct {
client *ttrpcutil.Client
closed chan struct{}
closer sync.Once
requeue chan *item
}
// Done returns a channel which closes when done
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}
// Close closes the remote connection and closes the done channel
func (l *RemoteEventsPublisher) Close() (err error) {
err = l.client.Close()
l.closer.Do(func() {
close(l.closed)
})
return err
}
func (l *RemoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
// drop the event
continue
}
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
logrus.WithError(err).Error("forward event")
l.queue(i)
}
}
}
func (l *RemoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
time.Sleep(time.Duration(1*i.count) * time.Second)
l.requeue <- i
}()
}
// Publish publishes the event by forwarding it to the configured ttrpc server
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
i := &item{
ev: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
ctx: ctx,
}
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
l.queue(i)
return err
}
return nil
}
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
service, err := l.client.EventsService()
if err == nil {
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err == nil {
return nil
}
}
if err != ttrpc.ErrClosed {
return err
}
// Reconnect and retry request
if err = l.client.Reconnect(); err != nil {
return err
}
service, err = l.client.EventsService()
if err != nil {
return err
}
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,501 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"errors"
"flag"
"fmt"
"io"
"os"
"runtime"
"runtime/debug"
"strings"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/containerd/plugin"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/version"
"github.com/containerd/ttrpc"
"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
)
// Publisher for events
type Publisher interface {
events.Publisher
io.Closer
}
// StartOpts describes shim start configuration received from containerd
type StartOpts struct {
ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry
ContainerdBinary string
Address string
TTRPCAddress string
}
type StopStatus struct {
Pid int
ExitStatus int
ExitedAt time.Time
}
// Init func for the creation of a shim server
// TODO(2.0): Remove init function
type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface
// TODO(2.0): Remove unified shim interface
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, opts StartOpts) (string, error)
}
// Manager is the interface which manages the shim process
type Manager interface {
Name() string
Start(ctx context.Context, id string, opts StartOpts) (string, error)
Stop(ctx context.Context, id string) (StopStatus, error)
}
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
BundlePath string
Debug bool
}
// BinaryOpts allows the configuration of a shims binary setup
type BinaryOpts func(*Config)
// Config of shim binary options provided by shim implementations
type Config struct {
// NoSubreaper disables setting the shim as a child subreaper
NoSubreaper bool
// NoReaper disables the shim binary from reaping any child process implicitly
NoReaper bool
// NoSetupLogger disables automatic configuration of logrus to use the shim FIFO
NoSetupLogger bool
}
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
type taskService struct {
shimapi.TaskService
}
func (t taskService) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, t.TaskService)
return nil
}
var (
debugFlag bool
versionFlag bool
id string
namespaceFlag string
socketFlag string
bundlePath string
addressFlag string
containerdBinaryFlag string
action string
)
const (
ttrpcAddressEnv = "TTRPC_ADDRESS"
)
func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&id, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
flag.Parse()
action = flag.Arg(0)
}
func setRuntime() {
debug.SetGCPercent(40)
go func() {
for range time.Tick(30 * time.Second) {
debug.FreeOSMemory()
}
}()
if os.Getenv("GOMAXPROCS") == "" {
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
// the number of Go stacks present in the shim.
runtime.GOMAXPROCS(2)
}
}
func setLogger(ctx context.Context, id string) (context.Context, error) {
l := log.G(ctx)
l.Logger.SetFormatter(&logrus.TextFormatter{
TimestampFormat: log.RFC3339NanoFixed,
FullTimestamp: true,
})
if debugFlag {
l.Logger.SetLevel(logrus.DebugLevel)
}
f, err := openLog(ctx, id)
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
return ctx, err
}
l.Logger.SetOutput(f)
return log.WithLogger(ctx, l), nil
}
// Run initializes and runs a shim server
// TODO(2.0): Remove function
func Run(name string, initFunc Init, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
ctx := context.Background()
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name))
if err := run(ctx, nil, initFunc, name, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s", name, err)
os.Exit(1)
}
}
// TODO(2.0): Remove this type
type shimToManager struct {
shim Shim
name string
}
func (stm shimToManager) Name() string {
return stm.name
}
func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) {
opts.ID = id
return stm.shim.StartShim(ctx, opts)
}
func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) {
// shim must already have id
dr, err := stm.shim.Cleanup(ctx)
if err != nil {
return StopStatus{}, err
}
return StopStatus{
Pid: int(dr.Pid),
ExitStatus: int(dr.ExitStatus),
ExitedAt: dr.ExitedAt,
}, nil
}
// RunManager initialzes and runs a shim server
// TODO(2.0): Rename to Run
func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))
if err := run(ctx, manager, nil, "", config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
os.Exit(1)
}
}
func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error {
parseFlags()
if versionFlag {
fmt.Printf("%s:\n", os.Args[0])
fmt.Println(" Version: ", version.Version)
fmt.Println(" Revision:", version.Revision)
fmt.Println(" Go version:", version.GoVersion)
fmt.Println("")
return nil
}
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
setRuntime()
signals, err := setupSignals(config)
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
return err
}
if !config.NoSubreaper {
if err := subreaper(); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
return err
}
}
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
publisher, err := NewPublisher(ttrpcAddress)
if err != nil {
return err
}
defer publisher.Close()
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx, sd := shutdown.WithShutdown(ctx)
defer sd.Shutdown()
if manager == nil {
service, err := initFunc(ctx, id, publisher, sd.Shutdown)
if err != nil {
return err
}
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugin.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return taskService{service}, nil
},
})
manager = shimToManager{
shim: service,
name: name,
}
}
// Handle explicit actions
switch action {
case "delete":
logger := log.G(ctx).WithFields(logrus.Fields{
"pid": os.Getpid(),
"namespace": namespaceFlag,
})
go reap(ctx, logger, signals)
ss, err := manager.Stop(ctx, id)
if err != nil {
return err
}
data, err := proto.Marshal(&shimapi.DeleteResponse{
Pid: uint32(ss.Pid),
ExitStatus: uint32(ss.ExitStatus),
ExitedAt: ss.ExitedAt,
})
if err != nil {
return err
}
if _, err := os.Stdout.Write(data); err != nil {
return err
}
return nil
case "start":
opts := StartOpts{
ContainerdBinary: containerdBinaryFlag,
Address: addressFlag,
TTRPCAddress: ttrpcAddress,
}
address, err := manager.Start(ctx, id, opts)
if err != nil {
return err
}
if _, err := os.Stdout.WriteString(address); err != nil {
return err
}
return nil
}
if !config.NoSetupLogger {
ctx, err = setLogger(ctx, id)
if err != nil {
return err
}
}
plugin.Register(&plugin.Registration{
Type: plugin.InternalPlugin,
ID: "shutdown",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return sd, nil
},
})
// Register event plugin
plugin.Register(&plugin.Registration{
Type: plugin.EventPlugin,
ID: "publisher",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return publisher, nil
},
})
var (
initialized = plugin.NewPluginSet()
ttrpcServices = []ttrpcService{}
)
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
for _, p := range plugins {
id := p.URI()
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
initContext := plugin.NewContext(
ctx,
p,
initialized,
// NOTE: Root is empty since the shim does not support persistent storage,
// shim plugins should make use state directory for writing files to disk.
// The state directory will be destroyed when the shim if cleaned up or
// on reboot
"",
bundlePath,
)
initContext.Address = addressFlag
initContext.TTRPCAddress = ttrpcAddress
// load the plugin specific configuration if it is provided
//TODO: Read configuration passed into shim, or from state directory?
//if p.Config != nil {
// pc, err := config.Decode(p)
// if err != nil {
// return nil, err
// }
// initContext.Config = pc
//}
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return fmt.Errorf("could not add plugin result to plugin set: %w", err)
}
instance, err := result.Instance()
if err != nil {
if plugin.IsSkipPlugin(err) {
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
} else {
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
}
continue
}
if src, ok := instance.(ttrpcService); ok {
logrus.WithField("id", id).Debug("registering ttrpc service")
ttrpcServices = append(ttrpcServices, src)
}
}
server, err := newServer()
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
return fmt.Errorf("failed creating server: %w", err)
}
for _, srv := range ttrpcServices {
if err := srv.RegisterTTRPC(server); err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
}
if err := serve(ctx, server, signals, sd.Shutdown); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
if err != shutdown.ErrShutdown {
return err
}
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
}
// serve serves the ttrpc API over a unix socket in the current working directory
// and blocks until the context is canceled
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error {
dump := make(chan os.Signal, 32)
setupDumpStacks(dump)
path, err := os.Getwd()
if err != nil {
return err
}
l, err := serveListener(socketFlag)
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
return err
}
go func() {
defer l.Close()
if err := server.Serve(ctx, l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
logger := log.G(ctx).WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
go handleExitSignals(ctx, logger, shutdown)
return reap(ctx, logger, signals)
}
func dumpStacks(logger *logrus.Entry) {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}

View file

@ -0,0 +1,27 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import "github.com/containerd/ttrpc"
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}
func subreaper() error {
return nil
}

View file

@ -0,0 +1,27 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import "github.com/containerd/ttrpc"
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}
func subreaper() error {
return nil
}

View file

@ -0,0 +1,30 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/ttrpc"
)
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
}
func subreaper() error {
return reaper.SetSubreaper(1)
}

View file

@ -0,0 +1,113 @@
//go:build !windows
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"fmt"
"io"
"net"
"os"
"os/signal"
"syscall"
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/fifo"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
if !config.NoReaper {
smp = append(smp, unix.SIGCHLD)
}
signal.Notify(signals, smp...)
return signals, nil
}
func setupDumpStacks(dump chan<- os.Signal) {
signal.Notify(dump, syscall.SIGUSR1)
}
func serveListener(path string) (net.Listener, error) {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > socketPathLimit {
return nil, fmt.Errorf("%q: unix socket path too long (> %d)", path, socketPathLimit)
}
l, err = net.Listen("unix", path)
}
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on socket")
return l, nil
}
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case <-ctx.Done():
return ctx.Err()
case s := <-signals:
// Exit signals are handled separately from this loop
// They get registered with this channel so that we can ignore such signals for short-running actions (e.g. `delete`)
switch s {
case unix.SIGCHLD:
if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:
}
}
}
}
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
ch := make(chan os.Signal, 32)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case s := <-ch:
logger.WithField("signal", s).Debugf("Caught exit signal")
cancel()
return
case <-ctx.Done():
return
}
}
}
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifoDup2(ctx, "log", unix.O_WRONLY, 0700, int(os.Stderr.Fd()))
}

View file

@ -0,0 +1,58 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"errors"
"io"
"net"
"os"
"github.com/containerd/ttrpc"
"github.com/sirupsen/logrus"
)
func setupSignals(config Config) (chan os.Signal, error) {
return nil, errors.New("not supported")
}
func newServer() (*ttrpc.Server, error) {
return nil, errors.New("not supported")
}
func subreaper() error {
return errors.New("not supported")
}
func setupDumpStacks(dump chan<- os.Signal) {
}
func serveListener(path string) (net.Listener, error) {
return nil, errors.New("not supported")
}
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
return errors.New("not supported")
}
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
}
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return nil, errors.New("not supported")
}

View file

@ -0,0 +1,169 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
exec "golang.org/x/sys/execabs"
)
type CommandConfig struct {
Runtime string
Address string
TTRPCAddress string
Path string
SchedCore bool
Args []string
Opts *types.Any
}
// Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, config *CommandConfig) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-address", config.Address,
"-publish-binary", self,
}
args = append(args, config.Args...)
cmd := exec.CommandContext(ctx, config.Runtime, args...)
cmd.Dir = config.Path
cmd.Env = append(
os.Environ(),
"GOMAXPROCS=2",
fmt.Sprintf("%s=%s", ttrpcAddressEnv, config.TTRPCAddress),
)
if config.SchedCore {
cmd.Env = append(cmd.Env, "SCHED_CORE=1")
}
cmd.SysProcAttr = getSysProcAttr()
if config.Opts != nil {
d, err := proto.Marshal(config.Opts)
if err != nil {
return nil, err
}
cmd.Stdin = bytes.NewReader(d)
}
return cmd, nil
}
// BinaryName returns the shim binary name from the runtime name,
// empty string returns means runtime name is invalid
func BinaryName(runtime string) string {
// runtime name should format like $prefix.name.version
parts := strings.Split(runtime, ".")
if len(parts) < 2 {
return ""
}
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
}
// BinaryPath returns the full path for the shim binary from the runtime name,
// empty string returns means runtime name is invalid
func BinaryPath(runtime string) string {
dir := filepath.Dir(runtime)
binary := BinaryName(runtime)
path, err := filepath.Abs(filepath.Join(dir, binary))
if err != nil {
return ""
}
return path
}
// Connect to the provided address
func Connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
// WritePidFile writes a pid file atomically
func WritePidFile(path string, pid int) error {
path, err := filepath.Abs(path)
if err != nil {
return err
}
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d", pid)
f.Close()
if err != nil {
return err
}
return os.Rename(tempPath, path)
}
// WriteAddress writes a address file atomically
func WriteAddress(path, address string) error {
path, err := filepath.Abs(path)
if err != nil {
return err
}
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
_, err = f.WriteString(address)
f.Close()
if err != nil {
return err
}
return os.Rename(tempPath, path)
}
// ErrNoAddress is returned when the address file has no content
var ErrNoAddress = errors.New("no shim address")
// ReadAddress returns the shim's socket address from the path
func ReadAddress(path string) (string, error) {
path, err := filepath.Abs(path)
if err != nil {
return "", err
}
data, err := os.ReadFile(path)
if err != nil {
return "", err
}
if len(data) == 0 {
return "", ErrNoAddress
}
return string(data), nil
}

View file

@ -0,0 +1,173 @@
//go:build !windows
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"crypto/sha256"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/sys"
)
const (
shimBinaryFormat = "containerd-shim-%s-%s"
socketPathLimit = 106
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
// AdjustOOMScore sets the OOM score for the process to the parents OOM score +1
// to ensure that they parent has a lower* score than the shim
// if not already at the maximum OOM Score
func AdjustOOMScore(pid int) error {
parent := os.Getppid()
score, err := sys.GetOOMScoreAdj(parent)
if err != nil {
return fmt.Errorf("get parent OOM score: %w", err)
}
shimScore := score + 1
if err := sys.AdjustOOMScore(pid, shimScore); err != nil {
return fmt.Errorf("set shim OOM score: %w", err)
}
return nil
}
const socketRoot = defaults.DefaultStateDir
// SocketAddress returns a socket address
func SocketAddress(ctx context.Context, socketPath, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
d := sha256.Sum256([]byte(filepath.Join(socketPath, ns, id)))
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d), nil
}
// AnonDialer returns a dialer for a socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", socket(address).path(), timeout)
}
// AnonReconnectDialer returns a dialer for an existing socket on reconnection
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
return AnonDialer(address, timeout)
}
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
var (
sock = socket(address)
path = sock.path()
)
isAbstract := sock.isAbstract()
if !isAbstract {
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return nil, fmt.Errorf("%s: %w", path, err)
}
}
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}
if !isAbstract {
if err := os.Chmod(path, 0600); err != nil {
os.Remove(sock.path())
l.Close()
return nil, err
}
}
return l.(*net.UnixListener), nil
}
const abstractSocketPrefix = "\x00"
type socket string
func (s socket) isAbstract() bool {
return !strings.HasPrefix(string(s), "unix://")
}
func (s socket) path() string {
path := strings.TrimPrefix(string(s), "unix://")
// if there was no trim performed, we assume an abstract socket
if len(path) == len(s) {
path = abstractSocketPrefix + path
}
return path
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
sock := socket(address)
if !sock.isAbstract() {
return os.Remove(sock.path())
}
return nil
}
// SocketEaddrinuse returns true if the provided error is caused by the
// EADDRINUSE error number
func SocketEaddrinuse(err error) bool {
netErr, ok := err.(*net.OpError)
if !ok {
return false
}
if netErr.Op != "listen" {
return false
}
syscallErr, ok := netErr.Err.(*os.SyscallError)
if !ok {
return false
}
errno, ok := syscallErr.Err.(syscall.Errno)
if !ok {
return false
}
return errno == syscall.EADDRINUSE
}
// CanConnect returns true if the socket provided at the address
// is accepting new connections
func CanConnect(address string) bool {
conn, err := AnonDialer(address, 100*time.Millisecond)
if err != nil {
return false
}
conn.Close()
return true
}

View file

@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"fmt"
"net"
"os"
"syscall"
"time"
winio "github.com/Microsoft/go-winio"
)
const shimBinaryFormat = "containerd-shim-%s-%s.exe"
func getSysProcAttr() *syscall.SysProcAttr {
return nil
}
// AnonReconnectDialer returns a dialer for an existing npipe on containerd reconnection
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := winio.DialPipeContext(ctx, address)
if os.IsNotExist(err) {
return nil, fmt.Errorf("npipe not found on reconnect: %w", os.ErrNotExist)
} else if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
} else if err != nil {
return nil, err
}
return c, nil
}
// AnonDialer returns a dialer for a npipe
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// If there is nobody serving the pipe we limit the timeout for this case to
// 5 seconds because any shim that would serve this endpoint should serve it
// within 5 seconds.
serveTimer := time.NewTimer(5 * time.Second)
defer serveTimer.Stop()
for {
c, err := winio.DialPipeContext(ctx, address)
if err != nil {
if os.IsNotExist(err) {
select {
case <-serveTimer.C:
return nil, fmt.Errorf("pipe not found before timeout: %w", os.ErrNotExist)
default:
// Wait 10ms for the shim to serve and try again.
time.Sleep(10 * time.Millisecond)
continue
}
} else if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
}
return nil, err
}
return c, nil
}
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
return nil
}

View file

@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package task

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,202 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.task.v2;
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import weak "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
import "github.com/containerd/containerd/api/types/task/task.proto";
option go_package = "github.com/containerd/containerd/runtime/v2/task;task";
// Shim service is launched for each container and is responsible for owning the IO
// for the container and its additional processes. The shim is also the parent of
// each container and allows reattaching to the IO and receiving the exit status
// for the container processes.
service Task {
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Pids(PidsRequest) returns (PidsResponse);
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc Connect(ConnectRequest) returns (ConnectResponse);
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}
message CreateTaskRequest {
string id = 1;
string bundle = 2;
repeated containerd.types.Mount rootfs = 3;
bool terminal = 4;
string stdin = 5;
string stdout = 6;
string stderr = 7;
string checkpoint = 8;
string parent_checkpoint = 9;
google.protobuf.Any options = 10;
}
message CreateTaskResponse {
uint32 pid = 1;
}
message DeleteRequest {
string id = 1;
string exec_id = 2;
}
message DeleteResponse {
uint32 pid = 1;
uint32 exit_status = 2;
google.protobuf.Timestamp exited_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message ExecProcessRequest {
string id = 1;
string exec_id = 2;
bool terminal = 3;
string stdin = 4;
string stdout = 5;
string stderr = 6;
google.protobuf.Any spec = 7;
}
message ExecProcessResponse {
}
message ResizePtyRequest {
string id = 1;
string exec_id = 2;
uint32 width = 3;
uint32 height = 4;
}
message StateRequest {
string id = 1;
string exec_id = 2;
}
message StateResponse {
string id = 1;
string bundle = 2;
uint32 pid = 3;
containerd.v1.types.Status status = 4;
string stdin = 5;
string stdout = 6;
string stderr = 7;
bool terminal = 8;
uint32 exit_status = 9;
google.protobuf.Timestamp exited_at = 10 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string exec_id = 11;
}
message KillRequest {
string id = 1;
string exec_id = 2;
uint32 signal = 3;
bool all = 4;
}
message CloseIORequest {
string id = 1;
string exec_id = 2;
bool stdin = 3;
}
message PidsRequest {
string id = 1;
}
message PidsResponse {
repeated containerd.v1.types.ProcessInfo processes = 1;
}
message CheckpointTaskRequest {
string id = 1;
string path = 2;
google.protobuf.Any options = 3;
}
message UpdateTaskRequest {
string id = 1;
google.protobuf.Any resources = 2;
map<string, string> annotations = 3;
}
message StartRequest {
string id = 1;
string exec_id = 2;
}
message StartResponse {
uint32 pid = 1;
}
message WaitRequest {
string id = 1;
string exec_id = 2;
}
message WaitResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message StatsRequest {
string id = 1;
}
message StatsResponse {
google.protobuf.Any stats = 1;
}
message ConnectRequest {
string id = 1;
}
message ConnectResponse {
uint32 shim_pid = 1;
uint32 task_pid = 2;
string version = 3;
}
message ShutdownRequest {
string id = 1;
bool now = 2;
}
message PauseRequest {
string id = 1;
}
message ResumeRequest {
string id = 1;
}

View file

@ -0,0 +1,283 @@
//go:build !windows
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reaper
import (
"errors"
"fmt"
"sync"
"syscall"
"time"
runc "github.com/containerd/go-runc"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 32
type subscriber struct {
sync.Mutex
c chan runc.Exit
closed bool
}
func (s *subscriber) close() {
s.Lock()
if s.closed {
s.Unlock()
return
}
close(s.c)
s.closed = true
s.Unlock()
}
func (s *subscriber) do(fn func()) {
s.Lock()
fn()
s.Unlock()
}
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := reap(false)
for _, e := range exits {
done := Default.notify(runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
})
select {
case <-done:
case <-time.After(1 * time.Second):
}
}
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]*subscriber),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]*subscriber
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
return ec, nil
}
// Wait blocks until a process is signal as dead.
// User should rely on the value of the exit status to determine if the
// command was successful or not.
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}
// WaitTimeout is used to skip the blocked command and kill the left process.
func (m *Monitor) WaitTimeout(c *exec.Cmd, ec chan runc.Exit, timeout time.Duration) (int, error) {
type exitStatusWrapper struct {
status int
err error
}
// capacity can make sure that the following goroutine will not be
// blocked if there is no receiver when timeout.
waitCh := make(chan *exitStatusWrapper, 1)
go func() {
defer close(waitCh)
status, err := m.Wait(c, ec)
waitCh <- &exitStatusWrapper{
status: status,
err: err,
}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-timer.C:
syscall.Kill(c.Process.Pid, syscall.SIGKILL)
return 0, fmt.Errorf("timeout %v for cmd(pid=%d): %s, %s", timeout, c.Process.Pid, c.Path, c.Args)
case res := <-waitCh:
return res.status, res.err
}
}
// Subscribe to process exit changes
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = &subscriber{
c: c,
}
m.Unlock()
return c
}
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
s, ok := m.subscribers[c]
if !ok {
m.Unlock()
return
}
s.close()
delete(m.subscribers, c)
m.Unlock()
}
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
out := make(map[chan runc.Exit]*subscriber)
m.Lock()
for k, v := range m.subscribers {
out[k] = v
}
m.Unlock()
return out
}
func (m *Monitor) notify(e runc.Exit) chan struct{} {
const timeout = 1 * time.Millisecond
var (
done = make(chan struct{}, 1)
timer = time.NewTimer(timeout)
success = make(map[chan runc.Exit]struct{})
)
stop(timer, true)
go func() {
defer close(done)
for {
var (
failed int
subscribers = m.getSubscribers()
)
for _, s := range subscribers {
s.do(func() {
if s.closed {
return
}
if _, ok := success[s.c]; ok {
return
}
timer.Reset(timeout)
recv := true
select {
case s.c <- e:
success[s.c] = struct{}{}
case <-timer.C:
recv = false
failed++
}
stop(timer, recv)
})
}
// all subscribers received the message
if failed == 0 {
return
}
}
}()
return done
}
func stop(timer *time.Timer, recv bool) {
if !timer.Stop() && recv {
<-timer.C
}
}
// exit is the wait4 information from an exited process
type exit struct {
Pid int
Status int
}
// reap reaps all child processes for the calling process and returns their
// exit information
func reap(wait bool) (exits []exit, err error) {
var (
ws unix.WaitStatus
rus unix.Rusage
)
flag := unix.WNOHANG
if wait {
flag = 0
}
for {
pid, err := unix.Wait4(-1, &ws, flag, &rus)
if err != nil {
if err == unix.ECHILD {
return exits, nil
}
return exits, err
}
if pid <= 0 {
return exits, nil
}
exits = append(exits, exit{
Pid: pid,
Status: exitStatus(ws),
})
}
}
const exitSignalOffset = 128
// exitStatus returns the correct exit status for a process based on if it
// was signaled or exited cleanly
func exitStatus(status unix.WaitStatus) int {
if status.Signaled() {
return exitSignalOffset + int(status.Signal())
}
return status.ExitStatus()
}

View file

@ -0,0 +1,39 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reaper
import (
"unsafe"
"golang.org/x/sys/unix"
)
// SetSubreaper sets the value i as the subreaper setting for the calling process
func SetSubreaper(i int) error {
return unix.Prctl(unix.PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
}
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
var i uintptr
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
return -1, err
}
return int(i), nil
}

6
vendor/modules.txt vendored
View file

@ -167,6 +167,7 @@ github.com/containerd/containerd/api/services/leases/v1
github.com/containerd/containerd/api/services/namespaces/v1 github.com/containerd/containerd/api/services/namespaces/v1
github.com/containerd/containerd/api/services/snapshots/v1 github.com/containerd/containerd/api/services/snapshots/v1
github.com/containerd/containerd/api/services/tasks/v1 github.com/containerd/containerd/api/services/tasks/v1
github.com/containerd/containerd/api/services/ttrpc/events/v1
github.com/containerd/containerd/api/services/version/v1 github.com/containerd/containerd/api/services/version/v1
github.com/containerd/containerd/api/types github.com/containerd/containerd/api/types
github.com/containerd/containerd/api/types/task github.com/containerd/containerd/api/types/task
@ -204,6 +205,8 @@ github.com/containerd/containerd/pkg/cap
github.com/containerd/containerd/pkg/dialer github.com/containerd/containerd/pkg/dialer
github.com/containerd/containerd/pkg/kmutex github.com/containerd/containerd/pkg/kmutex
github.com/containerd/containerd/pkg/seccomp github.com/containerd/containerd/pkg/seccomp
github.com/containerd/containerd/pkg/shutdown
github.com/containerd/containerd/pkg/ttrpcutil
github.com/containerd/containerd/pkg/userns github.com/containerd/containerd/pkg/userns
github.com/containerd/containerd/platforms github.com/containerd/containerd/platforms
github.com/containerd/containerd/plugin github.com/containerd/containerd/plugin
@ -217,6 +220,8 @@ github.com/containerd/containerd/remotes/errors
github.com/containerd/containerd/rootfs github.com/containerd/containerd/rootfs
github.com/containerd/containerd/runtime/linux/runctypes github.com/containerd/containerd/runtime/linux/runctypes
github.com/containerd/containerd/runtime/v2/runc/options github.com/containerd/containerd/runtime/v2/runc/options
github.com/containerd/containerd/runtime/v2/shim
github.com/containerd/containerd/runtime/v2/task
github.com/containerd/containerd/services github.com/containerd/containerd/services
github.com/containerd/containerd/services/content/contentserver github.com/containerd/containerd/services/content/contentserver
github.com/containerd/containerd/services/introspection github.com/containerd/containerd/services/introspection
@ -224,6 +229,7 @@ github.com/containerd/containerd/services/server/config
github.com/containerd/containerd/snapshots github.com/containerd/containerd/snapshots
github.com/containerd/containerd/snapshots/proxy github.com/containerd/containerd/snapshots/proxy
github.com/containerd/containerd/sys github.com/containerd/containerd/sys
github.com/containerd/containerd/sys/reaper
github.com/containerd/containerd/version github.com/containerd/containerd/version
# github.com/containerd/continuity v0.3.0 # github.com/containerd/continuity v0.3.0
## explicit; go 1.17 ## explicit; go 1.17