| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package pod
- import (
- "context"
- "fmt"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/onecloud/pkg/util/exec"
- "yunion.io/x/onecloud/pkg/util/procutils"
- )
- type CRI interface {
- Version(ctx context.Context) (*runtimeapi.VersionResponse, error)
- ListPods(ctx context.Context, opts ListPodOptions) ([]*runtimeapi.PodSandbox, error)
- RunPod(ctx context.Context, podConfig *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
- StopPod(ctx context.Context, req *runtimeapi.StopPodSandboxRequest) error
- RemovePod(ctx context.Context, podId string) error
- CreateContainer(ctx context.Context, podId string, podConfig *runtimeapi.PodSandboxConfig, ctrConfig *runtimeapi.ContainerConfig, withPull bool) (string, error)
- StartContainer(ctx context.Context, id string) error
- StopContainer(ctx context.Context, ctrId string, timeout int64, tryRemove bool, force bool) error
- RemoveContainer(ctx context.Context, ctrId string) error
- RunContainers(ctx context.Context, podConfig *runtimeapi.PodSandboxConfig, containerConfigs []*runtimeapi.ContainerConfig, runtimeHandler string) (*RunContainersResponse, error)
- ListContainers(ctx context.Context, opts ListContainerOptions) ([]*runtimeapi.Container, error)
- ContainerStatus(ctx context.Context, ctrId string) (*runtimeapi.ContainerStatusResponse, error)
- ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
- PullImage(ctx context.Context, req *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error)
- ImageStatus(ctx context.Context, req *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error)
- ExecSync(ctx context.Context, ctrId string, command []string, timeout int64) (*ExecSyncResponse, error)
- // lower layer client
- GetImageClient() runtimeapi.ImageServiceClient
- GetRuntimeClient() runtimeapi.RuntimeServiceClient
- }
- type ListContainerOptions struct {
- // Id of container
- Id string
- // PodId of container
- PodId string
- // Regular expression pattern to match pod or container
- NameRegexp string
- // Regular expression pattern to match the pod namespace
- PodNamespaceRegexp string
- // State of the sandbox
- State string
- // Show verbose info for the sandbox
- Verbose bool
- // Labels are selectors for the sandbox
- Labels map[string]string
- // Image ued by the container
- Image string
- }
- type crictl struct {
- endpoint string
- timeout time.Duration
- conn *grpc.ClientConn
- imgCli runtimeapi.ImageServiceClient
- runCli runtimeapi.RuntimeServiceClient
- }
- func NewCRI(endpoint string, timeout time.Duration) (CRI, error) {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- var dialOpts []grpc.DialOption
- maxMsgSize := 1024 * 1024 * 16
- dialOpts = append(dialOpts,
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
- conn, err := grpc.DialContext(ctx, endpoint, dialOpts...)
- if err != nil {
- return nil, errors.Wrapf(err, "Connect remote endpoint %q failed", endpoint)
- }
- imgCli := runtimeapi.NewImageServiceClient(conn)
- runCli := runtimeapi.NewRuntimeServiceClient(conn)
- return &crictl{
- endpoint: endpoint,
- timeout: timeout,
- conn: conn,
- imgCli: imgCli,
- runCli: runCli,
- }, nil
- }
- func (c crictl) GetImageClient() runtimeapi.ImageServiceClient {
- return c.imgCli
- }
- func (c crictl) GetRuntimeClient() runtimeapi.RuntimeServiceClient {
- return c.runCli
- }
- func (c crictl) Version(ctx context.Context) (*runtimeapi.VersionResponse, error) {
- return c.GetRuntimeClient().Version(ctx, &runtimeapi.VersionRequest{})
- }
- func (c crictl) ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
- ctx, cancel := context.WithTimeout(ctx, c.timeout)
- defer cancel()
- resp, err := c.GetImageClient().ListImages(ctx, &runtimeapi.ListImagesRequest{
- Filter: filter,
- })
- if err != nil {
- return nil, errors.Wrapf(err, "ListImages with filter %s", filter.String())
- }
- return resp.Images, nil
- }
- func (c crictl) RunPod(ctx context.Context, podConfig *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
- req := &runtimeapi.RunPodSandboxRequest{
- Config: podConfig,
- RuntimeHandler: runtimeHandler,
- }
- log.Infof("RunPodSandboxRequest: %v", req)
- r, err := c.GetRuntimeClient().RunPodSandbox(ctx, req)
- if err != nil {
- return "", errors.Wrapf(err, "RunPod with request: %s", req.String())
- }
- return r.GetPodSandboxId(), nil
- }
- func (c crictl) StopPod(ctx context.Context, req *runtimeapi.StopPodSandboxRequest) error {
- _, err := c.GetRuntimeClient().StopPodSandbox(ctx, req)
- if err != nil {
- return errors.Wrap(err, "StopPodSandbox")
- }
- return nil
- }
- // PullImageWithSandbox sends a PullImageRequest to the server and parses
- // the returned PullImageResponses.
- func (c crictl) PullImageWithSandbox(ctx context.Context, image string, auth *runtimeapi.AuthConfig, sandbox *runtimeapi.PodSandboxConfig, ann map[string]string) (*runtimeapi.PullImageResponse, error) {
- req := &runtimeapi.PullImageRequest{
- Image: &runtimeapi.ImageSpec{
- Image: image,
- Annotations: ann,
- },
- Auth: auth,
- SandboxConfig: sandbox,
- }
- log.Infof("PullImageRequest: %v", req)
- r, err := c.GetImageClient().PullImage(ctx, req)
- if err != nil {
- return nil, errors.Wrapf(err, "PullImage with %s", req)
- }
- return r, nil
- }
- // CreateContainer sends a CreateContainerRequest to the server, and parses
- // the returned CreateContainerResponse.
- func (c crictl) CreateContainer(ctx context.Context,
- podId string, podConfig *runtimeapi.PodSandboxConfig,
- ctrConfig *runtimeapi.ContainerConfig, withPull bool) (string, error) {
- req := &runtimeapi.CreateContainerRequest{
- PodSandboxId: podId,
- Config: ctrConfig,
- SandboxConfig: podConfig,
- }
- image := ctrConfig.GetImage().GetImage()
- // When there is a withPull request or the image default mode is to
- // pull-image-on-create(true) and no-pull was not set we pull the image when
- // they ask for a create as a helper on the cli to reduce extra steps. As a
- // reminder if the image is already in cache only the manifest will be pulled
- // down to verify.
- if withPull {
- // Try to pull the image before container creation
- ann := ctrConfig.GetImage().GetAnnotations()
- resp, err := c.PullImageWithSandbox(ctx, image, nil, nil, ann)
- if err != nil {
- return "", errors.Wrap(err, "PullImageWithSandbox")
- }
- log.Infof("Pull image %s", resp.String())
- }
- log.Debugf("CreateContainerRequest pod %s: %v", podId, req)
- r, err := c.GetRuntimeClient().CreateContainer(ctx, req)
- if err != nil {
- return "", errors.Wrapf(err, "CreateContainer with: %s", req)
- }
- return r.GetContainerId(), nil
- }
- // StartContainer sends a StartContainerRequest to the server, and parses
- // the returned StartContainerResponse.
- func (c crictl) StartContainer(ctx context.Context, id string) error {
- if id == "" {
- return errors.Error("Id can't be empty")
- }
- if _, err := c.GetRuntimeClient().StartContainer(ctx, &runtimeapi.StartContainerRequest{
- ContainerId: id,
- }); err != nil {
- return errors.Wrapf(err, "StartContainer %s", id)
- }
- return nil
- }
- type RunContainersResponse struct {
- PodId string `json:"pod_id"`
- ContainerIds []string `json:"container_ids"`
- }
- // RunContainers starts containers in the provided pod sandbox
- func (c crictl) RunContainers(
- ctx context.Context, podConfig *runtimeapi.PodSandboxConfig, containerConfigs []*runtimeapi.ContainerConfig, runtimeHandler string) (*RunContainersResponse, error) {
- // Create the pod
- podId, err := c.RunPod(ctx, podConfig, runtimeHandler)
- if err != nil {
- return nil, errors.Wrap(err, "RunPod")
- }
- ret := &RunContainersResponse{
- PodId: podId,
- ContainerIds: make([]string, 0),
- }
- // Create the containers
- for idx, ctr := range containerConfigs {
- // Create the container
- ctrId, err := c.CreateContainer(ctx, podId, podConfig, ctr, true)
- if err != nil {
- return nil, errors.Wrapf(err, "CreateContainer %d", idx)
- }
- // Start the container
- if err := c.StartContainer(ctx, ctrId); err != nil {
- return nil, errors.Wrapf(err, "StarContainer %d", idx)
- }
- ret.ContainerIds = append(ret.ContainerIds, ctrId)
- }
- return ret, nil
- }
- func (c crictl) ListContainers(ctx context.Context, opts ListContainerOptions) ([]*runtimeapi.Container, error) {
- filter := &runtimeapi.ContainerFilter{
- Id: opts.Id,
- PodSandboxId: opts.PodId,
- }
- st := &runtimeapi.ContainerStateValue{}
- if opts.State != "" {
- st.State = runtimeapi.ContainerState_CONTAINER_UNKNOWN
- switch strings.ToLower(opts.State) {
- case "created":
- st.State = runtimeapi.ContainerState_CONTAINER_CREATED
- case "running":
- st.State = runtimeapi.ContainerState_CONTAINER_RUNNING
- case "exited":
- st.State = runtimeapi.ContainerState_CONTAINER_EXITED
- case "unknown":
- st.State = runtimeapi.ContainerState_CONTAINER_UNKNOWN
- default:
- return nil, fmt.Errorf("unsupported state: %q", opts.State)
- }
- filter.State = st
- }
- if opts.Labels != nil {
- filter.LabelSelector = opts.Labels
- }
- req := &runtimeapi.ListContainersRequest{
- Filter: filter,
- }
- r, err := c.GetRuntimeClient().ListContainers(ctx, req)
- if err != nil {
- return nil, errors.Wrap(err, "ListContainers")
- }
- return r.Containers, nil
- }
- type ListPodOptions struct {
- Id string
- State string
- }
- func (c crictl) ListPods(ctx context.Context, opts ListPodOptions) ([]*runtimeapi.PodSandbox, error) {
- filter := &runtimeapi.PodSandboxFilter{
- Id: opts.Id,
- }
- if opts.State != "" {
- st := &runtimeapi.PodSandboxStateValue{}
- st.State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
- switch strings.ToLower(opts.State) {
- case "ready":
- st.State = runtimeapi.PodSandboxState_SANDBOX_READY
- case "notready":
- st.State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
- default:
- return nil, errors.Errorf("Invalid state: %q", opts.State)
- }
- filter.State = st
- }
- req := &runtimeapi.ListPodSandboxRequest{
- Filter: filter,
- }
- ret, err := c.GetRuntimeClient().ListPodSandbox(ctx, req)
- if err != nil {
- return nil, errors.Wrap(err, "ListPodSandbox")
- }
- return ret.Items, nil
- }
- func (c crictl) RemovePod(ctx context.Context, podId string) error {
- maxTries := 10
- interval := 5 * time.Second
- errs := []error{}
- for tries := 0; tries < maxTries; tries++ {
- err := func() error {
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- return c.removePod(ctx, podId)
- }()
- if err == nil {
- return nil
- }
- if strings.Contains(err.Error(), "code = NotFound") {
- return nil
- }
- dur := interval * time.Duration(tries+1)
- log.Warningf("try to remove pod %s after %s: %v", podId, dur, err)
- errs = append(errs, errors.Wrapf(err, "try %d", tries))
- time.Sleep(dur)
- }
- return errors.NewAggregate(errs)
- }
- func (c crictl) removePod(ctx context.Context, podId string) error {
- if _, err := c.GetRuntimeClient().RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
- PodSandboxId: podId,
- }); err != nil {
- return errors.Wrap(err, "RemovePodSandbox")
- }
- return nil
- }
- func (c crictl) stopContainerWithRetry(ctx context.Context, ctrId string, timeout int64, maxTries int) error {
- interval := 5 * time.Second
- errs := []error{}
- for tries := 0; tries < maxTries; tries++ {
- err := func() error {
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
- return c.stopContainer(ctx, ctrId, timeout)
- }()
- if err == nil {
- return nil
- }
- if strings.Contains(err.Error(), "code = NotFound") {
- return nil
- }
- dur := interval * time.Duration(tries+1)
- log.Warningf("try to restop container %s after %s, timeout: %d: %v", ctrId, dur, timeout, err)
- // set timeout to 0 to stop forcely
- timeout = 0
- errs = append(errs, errors.Wrapf(err, "try %d", tries))
- time.Sleep(dur)
- }
- return errors.NewAggregate(errs)
- }
- func (c crictl) StopContainer(ctx context.Context, ctrId string, timeout int64, tryRemove bool, force bool) error {
- errs := []error{}
- isStopped := false
- if force {
- if err := c.forceKillContainer(ctx, ctrId); err != nil {
- log.Infof("force kill container %s error: %v", ctrId, err)
- errs = append(errs, errors.Wrap(err, "forceKillContainer"))
- } else {
- isStopped = true
- }
- } else {
- maxTries := 10
- if err := c.stopContainerWithRetry(ctx, ctrId, timeout, maxTries); err != nil {
- errs = append(errs, errors.Wrap(err, "stopContainer"))
- } else {
- isStopped = true
- }
- }
- // 尝试 force kill 容器
- if !force && !isStopped {
- if err := c.forceKillContainer(ctx, ctrId); err != nil {
- log.Infof("try force kill container %s error: %v", ctrId, err)
- errs = append(errs, errors.Wrap(err, "retry forceKillContainer"))
- } else {
- isStopped = true
- }
- }
- if tryRemove {
- // try force remove container
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
- if err := c.RemoveContainer(ctx, ctrId); err != nil {
- errs = append(errs, errors.Wrapf(err, "try remove container %s", ctrId))
- } else {
- return nil
- }
- }
- if isStopped {
- return nil
- }
- return errors.NewAggregate(errs)
- }
- func (c crictl) stopContainer(ctx context.Context, ctrId string, timeout int64) error {
- if _, err := c.GetRuntimeClient().StopContainer(ctx, &runtimeapi.StopContainerRequest{
- ContainerId: ctrId,
- Timeout: timeout,
- }); err != nil {
- return errors.Wrap(err, "StopContainer")
- }
- return nil
- }
- func (c crictl) forceKillContainer(ctx context.Context, ctrId string) error {
- cs, err := c.containerStatus(ctx, ctrId, true)
- if err != nil {
- return errors.Wrap(err, "get containerStatus")
- }
- info := cs.GetInfo()
- infoStr := info["info"]
- if infoStr == "" {
- return errors.Errorf("empty info: %s", infoStr)
- }
- infoObj, err := jsonutils.ParseString(infoStr)
- if err != nil {
- return errors.Wrapf(err, "invalid info: %s", infoStr)
- }
- pidInt, err := infoObj.Int("pid")
- if err != nil {
- return errors.Wrapf(err, "get pid from %s", infoObj)
- }
- pid := fmt.Sprintf("%d", pidInt)
- // get ppid
- pStatusFile := filepath.Join("/proc", pid, "task", pid, "status")
- out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", fmt.Sprintf("cat %s | grep PPid: | awk '{print $2}'", pStatusFile)).Output()
- if err != nil {
- return errors.Wrapf(err, "get ppid from %s, out: %s", pStatusFile, out)
- }
- ppidStr := strings.TrimSpace(string(out))
- ppid, err := strconv.Atoi(ppidStr)
- if err != nil {
- return errors.Wrapf(err, "invalid ppid str %s from %s", ppidStr, pStatusFile)
- }
- ppCmdlineFile := filepath.Join("/proc", ppidStr, "cmdline")
- ppCmdline, err := procutils.NewRemoteCommandAsFarAsPossible("cat", ppCmdlineFile).Output()
- if err != nil {
- return errors.Wrapf(err, "get cmdline from %s, out: %s", ppCmdlineFile, ppCmdline)
- }
- log.Infof("try to kill container %s, pid %s parent process(%d): %s", ctrId, pid, ppid, ppCmdline)
- killOut, err := procutils.NewRemoteCommandAsFarAsPossible("kill", "-9", ppidStr).Output()
- if err != nil {
- killErr := errors.Wrapf(err, "kill -9 %s, out: %s", ppidStr, killOut)
- log.Errorf("kill container %s, pid %s parent process(%d): %s, error: %v", ctrId, pid, ppid, ppCmdline, killErr)
- return killErr
- }
- if err := c.stopContainerWithRetry(ctx, ctrId, 0, 5); err != nil {
- return errors.Wrapf(err, "stop container %s after kill parent process", ctrId)
- }
- return nil
- }
- func (c crictl) RemoveContainer(ctx context.Context, ctrId string) error {
- _, err := c.GetRuntimeClient().RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
- ContainerId: ctrId,
- })
- if err != nil {
- return errors.Wrap(err, "RemoveContainer")
- }
- return nil
- }
- func (c crictl) ContainerStatus(ctx context.Context, ctrId string) (*runtimeapi.ContainerStatusResponse, error) {
- return c.containerStatus(ctx, ctrId, false)
- }
- func (c crictl) containerStatus(ctx context.Context, ctrId string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
- req := &runtimeapi.ContainerStatusRequest{
- ContainerId: ctrId,
- Verbose: verbose,
- }
- return c.GetRuntimeClient().ContainerStatus(ctx, req)
- }
- func (c crictl) PullImage(ctx context.Context, req *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error) {
- resp, err := c.GetImageClient().PullImage(ctx, req)
- if err != nil {
- return nil, errors.Wrapf(err, "PullImage")
- }
- return resp, nil
- }
- func (c crictl) ImageStatus(ctx context.Context, req *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error) {
- return c.GetImageClient().ImageStatus(ctx, req)
- }
- type ExecSyncResponse struct {
- Stdout []byte
- Stderr []byte
- ExitCode int32
- }
- func (c crictl) ExecSync(ctx context.Context, ctrId string, command []string, timeout int64) (*ExecSyncResponse, error) {
- cli := c.GetRuntimeClient()
- resp, err := cli.ExecSync(ctx, &runtimeapi.ExecSyncRequest{
- ContainerId: ctrId,
- Cmd: command,
- Timeout: timeout,
- })
- if err != nil {
- return nil, errors.Wrapf(err, "exec sync container %s with command: %v", ctrId, command)
- }
- if resp.GetExitCode() != 0 {
- return nil, exec.CodeExitError{
- Err: errors.Errorf("stdout: %s, stderr: %s, exited: %d", resp.GetStdout(), resp.GetStderr(), resp.GetExitCode()),
- Code: int(resp.ExitCode),
- }
- }
- return &ExecSyncResponse{
- Stdout: resp.Stdout,
- Stderr: resp.Stderr,
- ExitCode: resp.ExitCode,
- }, nil
- }
|