// Copyright 2019 Yunion // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package guestman import ( "context" "encoding/base64" "encoding/json" "fmt" "io" "io/ioutil" "net/url" "os" "path" "path/filepath" "strconv" "strings" "sync" "time" "github.com/opencontainers/runtime-spec/specs-go" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" "yunion.io/x/pkg/util/sets" "yunion.io/x/pkg/utils" "yunion.io/x/onecloud/pkg/apis" computeapi "yunion.io/x/onecloud/pkg/apis/compute" hostapi "yunion.io/x/onecloud/pkg/apis/host" "yunion.io/x/onecloud/pkg/hostman/container/device" "yunion.io/x/onecloud/pkg/hostman/container/lifecycle" "yunion.io/x/onecloud/pkg/hostman/container/prober" proberesults "yunion.io/x/onecloud/pkg/hostman/container/prober/results" "yunion.io/x/onecloud/pkg/hostman/container/snapshot_service" "yunion.io/x/onecloud/pkg/hostman/container/status" "yunion.io/x/onecloud/pkg/hostman/container/volume_mount" "yunion.io/x/onecloud/pkg/hostman/container/volume_mount/disk" _ "yunion.io/x/onecloud/pkg/hostman/container/volume_mount/disk" "yunion.io/x/onecloud/pkg/hostman/guestman/desc" "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" "yunion.io/x/onecloud/pkg/hostman/guestman/pod/statusman" deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis" "yunion.io/x/onecloud/pkg/hostman/hostinfo" "yunion.io/x/onecloud/pkg/hostman/hostutils" "yunion.io/x/onecloud/pkg/hostman/isolated_device" _ "yunion.io/x/onecloud/pkg/hostman/isolated_device/container_device/cdi" "yunion.io/x/onecloud/pkg/hostman/options" "yunion.io/x/onecloud/pkg/hostman/storageman" "yunion.io/x/onecloud/pkg/httperrors" "yunion.io/x/onecloud/pkg/mcclient" "yunion.io/x/onecloud/pkg/mcclient/auth" computemod "yunion.io/x/onecloud/pkg/mcclient/modules/compute" imagemod "yunion.io/x/onecloud/pkg/mcclient/modules/image" "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset" "yunion.io/x/onecloud/pkg/util/fileutils2" "yunion.io/x/onecloud/pkg/util/mountutils" podutil "yunion.io/x/onecloud/pkg/util/pod" containerdutil "yunion.io/x/onecloud/pkg/util/pod/containerd" "yunion.io/x/onecloud/pkg/util/pod/logs" "yunion.io/x/onecloud/pkg/util/pod/nerdctl" "yunion.io/x/onecloud/pkg/util/procutils" ) func (m *SGuestManager) startContainerProbeManager() { livenessManager := proberesults.NewManager() startupManager := proberesults.NewManager() man := prober.NewManager(status.NewManager(), livenessManager, startupManager, newContainerRunner(m)) m.containerProbeManager = man man.Start() } func (m *SGuestManager) GetContainerProbeManager() prober.Manager { return m.containerProbeManager } func newContainerRunner(man *SGuestManager) *containerRunner { return &containerRunner{man} } type containerRunner struct { manager *SGuestManager } func (cr *containerRunner) RunInContainer(podId string, containerId string, cmd []string, timeout time.Duration) ([]byte, error) { srv, ok := cr.manager.GetServer(podId) if !ok { return nil, errors.Wrapf(httperrors.ErrNotFound, "server %s not found", podId) } s := srv.(*sPodGuestInstance) ctrCriId, err := s.getContainerCRIId(containerId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } resp, err := s.getCRI().ExecSync(context.Background(), ctrCriId, cmd, int64(timeout.Seconds())) if err != nil { return nil, errors.Wrapf(err, "exec sync %#v to %s", cmd, ctrCriId) } return append(resp.Stdout, resp.Stderr...), nil } type PodInstance interface { GuestRuntimeInstance GetCRIId() string GetContainerById(ctrId string) *hostapi.ContainerDesc GetContainerByCRIId(criId string) (*hostapi.ContainerDesc, error) CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) DeleteContainer(ctx context.Context, cred mcclient.TokenCredential, id string) (jsonutils.JSONObject, error) SyncStatus(reason string, ctrId string) SyncContainerStatus(ctx context.Context, cred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) StopContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerStopInput) (jsonutils.JSONObject, error) GetContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) IsPrimaryContainer(ctrId string) bool StopAll(ctx context.Context) error PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error) ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error) ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error) SetContainerResourceLimit(ctrId string, limit *apis.ContainerResources) (jsonutils.JSONObject, error) CommitContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCommitInput) (jsonutils.JSONObject, error) AddContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountAddPostOverlayInput) error RemoveContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountRemovePostOverlayInput) error ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error // for monitoring GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error) IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool) IsInternalRemoved(ctrCriId string) bool GetPodContainerCriIds() []string // For container log rotation: log dir, relative log path per container, and ctrId->criId map GetPodLogDir() string GetContainerLogPath(ctrId string) string ListContainerCriIds() map[string]string } type sContainer struct { Id string `json:"id"` Index int `json:"index"` CRIId string `json:"cri_id"` } func newContainer(id string) *sContainer { return &sContainer{ Id: id, } } type startStatHelper struct { podId string homeDir string } func newStartStatHelper(podId string, homeDir string) *startStatHelper { return &startStatHelper{ podId: podId, homeDir: homeDir, } } func (h startStatHelper) getPodFile() string { return filepath.Join(h.homeDir, "pod-start.stat") } func (h startStatHelper) IsPodFileExists() bool { return fileutils2.Exists(h.getPodFile()) } func (h startStatHelper) createStatFile(fp string) error { if fileutils2.Exists(fp) { return nil } if err := podutil.EnsureFile(fp, "", "755"); err != nil { return errors.Wrapf(err, "ensure file %s", fp) } return nil } func (h startStatHelper) removeStatFile(fp string) error { if !fileutils2.Exists(fp) { return nil } if err := os.Remove(fp); err != nil && !strings.Contains(err.Error(), "no such file or directory") { return errors.Wrapf(err, "remove file %s", fp) } return nil } func (h startStatHelper) CreatePodFile() error { return h.createStatFile(h.getPodFile()) } func (h startStatHelper) RemovePodFile() error { return h.removeStatFile(h.getPodFile()) } func (h startStatHelper) getContainerFile(ctrId string) string { return filepath.Join(h.homeDir, fmt.Sprintf("container-start-%s.stat", ctrId)) } func (h startStatHelper) IsContainerFileExists(ctrId string) bool { return fileutils2.Exists(h.getContainerFile(ctrId)) } func (h startStatHelper) CreateContainerFile(ctrId string) error { return h.createStatFile(h.getContainerFile(ctrId)) } func (h startStatHelper) RemoveContainerFile(ctrId string) error { return h.removeStatFile(h.getContainerFile(ctrId)) } type sPodGuestInstance struct { *sBaseGuestInstance containers map[string]*sContainer startStat *startStatHelper expectedStatus *PodExpectedStatus startPodLock sync.Mutex saveContainerLock sync.Mutex } func newPodGuestInstance(id string, man *SGuestManager) PodInstance { p := &sPodGuestInstance{ sBaseGuestInstance: newBaseGuestInstance(id, man, computeapi.HYPERVISOR_POD), containers: make(map[string]*sContainer), startPodLock: sync.Mutex{}, saveContainerLock: sync.Mutex{}, } es, err := NewPodExpectedStatus(p.HomeDir(), computeapi.VM_UNKNOWN) if err != nil { log.Fatalf("NewPodExpectedStatus failed of %s: %s", p.HomeDir(), err) } p.expectedStatus = es p.startStat = newStartStatHelper(id, p.HomeDir()) return p } func (s *sPodGuestInstance) CleanGuest(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) { var err error criId := s.GetCRIId() if criId == "" { criId, err = s.getPodIdFromCRI() if err != nil && !IsContainerNotFoundError(err) { return nil, errors.Wrapf(err, "get cri pod id") } } if criId != "" { if err := s.getCRI().RemovePod(ctx, criId); err != nil { return nil, errors.Wrapf(err, "RemovePod with cri_id %q", criId) } } return nil, DeleteHomeDir(s) } func (s *sPodGuestInstance) getPodIdFromCRI() (string, error) { ids, err := runtime.GetSandboxIDByPodUID(s.getCRI(), s.GetInitialId(), nil) if err != nil { return "", errors.Wrapf(err, "get pod cri_id by uid %s", s.GetInitialId()) } if len(ids) == 0 { return "", errors.Wrapf(errors.ErrNotFound, "not found cri pod by uid %s", s.GetInitialId()) } return ids[0], nil } func (s *sPodGuestInstance) ExitCleanup(clear bool) { } func (s *sPodGuestInstance) CleanDirtyGuest(ctx context.Context) error { _, err := s.CleanGuest(ctx, false) return err } func (s *sPodGuestInstance) ImportServer(pendingDelete bool) { // TODO: 参考SKVMGuestInstance,可以做更多的事,比如同步状态 s.manager.SaveServer(s.Id, s) s.manager.RemoveCandidateServer(s) if s.IsDaemon() || s.IsDirtyShutdown() { ctx := context.Background() cred := hostutils.GetComputeSession(ctx).GetToken() if err := s.StartLocalDirtyPod(ctx, cred); err != nil { log.Errorf("start local pod err %s", err.Error()) } } else { s.SyncStatus("sync status after host started", "") s.getProbeManager().AddPod(s) } } func (s *sPodGuestInstance) isPodDirtyShutdown() bool { if !s.IsRunning() && s.startStat.IsPodFileExists() { return true } return false } func (s *sPodGuestInstance) isContainerDirtyShutdown(ctrId string) bool { isRunning, err := s.IsContainerRunning(context.Background(), ctrId) if err != nil { log.Warningf("[isContainerDrityShutdown] IsContainerRunning(%s, %s): %v", s.GetId(), ctrId, err) } if !isRunning && s.startStat.IsContainerFileExists(ctrId) { return true } return false } func (s *sPodGuestInstance) IsDirtyShutdown() bool { if !s.manager.EnableDirtyRecoveryFeature() { return false } if s.isPodDirtyShutdown() { return true } for _, ctr := range s.GetContainers() { if s.isContainerDirtyShutdown(ctr.Id) { return true } } return false } func (s *sPodGuestInstance) getStatus(ctx context.Context, defaultStatus string) string { status := defaultStatus if status == "" { status = computeapi.VM_READY } if s.IsRunning() { status = computeapi.VM_RUNNING } for _, c := range s.GetContainers() { cStatus, cs, err := s.getContainerStatus(ctx, c.Id) if err != nil { log.Errorf("get container %s status of pod %s", c.Id, s.Id) continue } if cs != nil { status = GetPodStatusByContainerStatus(status, cStatus, s.IsPrimaryContainer(c.Id)) } } return status } func (s *sPodGuestInstance) GetUploadStatus(ctx context.Context, reason string) (*computeapi.HostUploadGuestStatusInput, error) { // sync pod status var status = computeapi.VM_READY if s.IsRunning() { status = computeapi.VM_RUNNING } errs := make([]error, 0) if err := s.expectedStatus.SetStatus(status); err != nil { log.Warningf("set expected status to %s, reason: %s, err: %s", status, reason, err.Error()) } /*if status == computeapi.VM_READY { // remove pod if err := s.stopPod(ctx, 5); err != nil { log.Warningf("stop cri pod when sync status: %s: %v", s.Id, err) } }*/ // sync container's status cStatuss := make(map[string]*computeapi.ContainerPerformStatusInput) for _, c := range s.GetContainers() { cStatus, cs, err := s.getContainerStatus(ctx, c.Id) if err != nil { log.Errorf("get container %s status of pod %s", c.Id, s.Id) continue } /*if err := s.expectedStatus.SetContainerStatus(c.CRIId, c.Id, cStatus); err != nil { log.Warningf("expectedStatus.SetContainerStatus(%s, %s) to %s, error: %s", s.GetId(), c.Id, cStatus, err.Error()) }*/ ctrStatusInput := &computeapi.ContainerPerformStatusInput{ PerformStatusInput: apis.PerformStatusInput{ Status: cStatus, Reason: reason, HostId: hostinfo.Instance().HostId, }, } if cs != nil { if computeapi.ContainerNoFailedRunningStatus.Has(cStatus) { ctrStatusInput.RestartCount = 0 } else { ctrStatusInput.RestartCount = cs.RestartCount } if !cs.StartedAt.IsZero() { ctrStatusInput.StartedAt = &cs.StartedAt } if !cs.FinishedAt.IsZero() { ctrStatusInput.LastFinishedAt = &cs.FinishedAt } if ctr := s.GetContainerById(c.Id); ctr != nil { ctr.RestartCount = ctrStatusInput.RestartCount ctr.StartedAt = cs.StartedAt ctr.LastFinishedAt = cs.FinishedAt if err := s.SaveContainerDesc(ctr); err != nil { errs = append(errs, errors.Wrapf(err, "save container desc for %s/%s", ctr.Id, ctr.Name)) } } } cStatuss[c.Id] = ctrStatusInput status = GetPodStatusByContainerStatus(status, cStatus, s.IsPrimaryContainer(c.Id)) } if len(errs) > 0 { log.Errorf("get upload status error: %v", errors.NewAggregate(errs)) } statusInput := &apis.PerformStatusInput{ Status: status, Reason: reason, PowerStates: GetPowerStates(s), HostId: hostinfo.Instance().HostId, } return &computeapi.HostUploadGuestStatusInput{ PerformStatusInput: *statusInput, Containers: cStatuss, }, nil } // UploadStatus uploads the status of the pod and the specified container to the server // If uploadCtrId is not empty, only the status of the specified container will be uploaded // If uploadCtrId is empty, all containers' status will be uploaded func (s *sPodGuestInstance) UploadStatus(ctx context.Context, reason string, uploadCtrId string) error { /*resp, err := s.GetUploadStatus(ctx, reason) if err != nil { return errors.Wrapf(err, "get upload status of pod: %s", reason) } errs := make([]error, 0) // sync container's status for id, ctrStatusInput := range resp.Containers { if _, err := hostutils.UpdateContainerStatus(ctx, id, ctrStatusInput); err != nil { errs = append(errs, errors.Wrapf(err, "failed update container %s status", id)) } // 同步容器状态可能会出现 probing 状态,所以需要 mark 成 dirty,等待 probe manager 重新探测容器状态 s.markContainerProbeDirty(ctrStatusInput.Status, id, reason) } if _, err := hostutils.UpdateServerStatus(ctx, s.Id, &resp.PerformStatusInput); err != nil { errs = append(errs, errors.Wrapf(err, "failed update guest status")) }*/ // return errors.NewAggregate(errs) resp, err := s.GetUploadStatus(ctx, reason) if err != nil { return errors.Wrapf(err, "get upload status of pod: %s", reason) } containerStatuses := make(map[string]*statusman.ContainerStatus) for ctrId, cStatus := range resp.Containers { if uploadCtrId != "" && ctrId != uploadCtrId { continue } containerStatuses[ctrId] = &statusman.ContainerStatus{ Status: cStatus.Status, RestartCount: cStatus.RestartCount, StartedAt: cStatus.StartedAt, LastFinishedAt: cStatus.LastFinishedAt, } } if err := statusman.GetManager().UpdateStatus(&statusman.PodStatusUpdateRequest{ Id: s.Id, Pod: s, Status: resp.Status, ContainerStatuses: containerStatuses, Reason: reason, }); err != nil { return errors.Wrapf(err, "update status of pod: %s", reason) } return nil } func (s *sPodGuestInstance) PostUploadStatus(resp *computeapi.HostUploadGuestStatusInput, reason string) { for ctrId, cStatus := range resp.Containers { s.markContainerProbeDirty(cStatus.Status, ctrId, reason) } } func (s *sPodGuestInstance) SyncStatus(reason string, ctrId string) { if err := s.UploadStatus(context.Background(), reason, ctrId); err != nil { log.Warningf("upload status failed, reason: %s, err: %v", reason, err) } } func (s *sPodGuestInstance) DeployFs(ctx context.Context, userCred mcclient.TokenCredential, deployInfo *deployapi.DeployInfo) (jsonutils.JSONObject, error) { // update port_mappings /*podInput, err := s.getPodCreateParams() if err != nil { return nil, errors.Wrap(err, "getPodCreateParams") } if len(podInput.PortMappings) != 0 { pms, err := s.getPortMappings(podInput.PortMappings) if err != nil { return nil, errors.Wrap(err, "get port mappings") } if err := s.setPortMappings(ctx, userCred, s.convertToPodMetadataPortMappings(pms)); err != nil { return nil, errors.Wrap(err, "set port mappings") } }*/ return nil, nil } func (s *sPodGuestInstance) IsStopped() bool { //TODO implement me panic("implement me") } func (s *sPodGuestInstance) IsSuspend() bool { return false } func (s *sPodGuestInstance) getCRI() podutil.CRI { return s.manager.GetCRI() } func (s *sPodGuestInstance) getProbeManager() prober.Manager { return s.manager.GetContainerProbeManager() } func (s *sPodGuestInstance) getHostCPUMap() *podutil.HostContainerCPUMap { return s.manager.GetContainerCPUMap() } func (s *sPodGuestInstance) getPod(ctx context.Context) (*runtimeapi.PodSandbox, error) { pods, err := s.getCRI().ListPods(ctx, podutil.ListPodOptions{}) if err != nil { return nil, errors.Wrap(err, "ListPods") } for _, p := range pods { if p.Metadata.Uid == s.Id { return p, nil } } return nil, errors.Wrap(httperrors.ErrNotFound, "Not found pod from containerd") } func (s *sPodGuestInstance) IsRunning() bool { pod, err := s.getPod(context.Background()) if err != nil { return false } if pod.GetState() == runtimeapi.PodSandboxState_SANDBOX_READY { return true } return false } func (s *sPodGuestInstance) IsContainerRunning(ctx context.Context, ctrId string) (bool, error) { status, _, err := s.getContainerStatus(ctx, ctrId) if err != nil { return false, errors.Wrapf(err, "get container %s status error", ctrId) } if computeapi.ContainerRunningStatus.Has(status) { return true, nil } return false, nil } func (s *sPodGuestInstance) probeGuestStatus(ctx context.Context, resp *computeapi.HostUploadGuestStatusInput) { resp.Status = s.getStatus(ctx, resp.Status) } func (s *sPodGuestInstance) HandleGuestStatus(ctx context.Context, resp *computeapi.HostUploadGuestStatusInput, isBatch bool) *computeapi.HostUploadGuestStatusInput { //s.probeGuestStatus(ctx, resp) //hostutils.TaskComplete(ctx, jsonutils.Marshal(resp)) ctrStatus, _ := s.GetUploadStatus(ctx, "") return ctrStatus } func (s *sPodGuestInstance) HandleGuestStart(ctx context.Context, userCred mcclient.TokenCredential, body jsonutils.JSONObject) (jsonutils.JSONObject, error) { hostutils.DelayTaskWithWorker(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) { resp, err := s.startPod(ctx, userCred) if err != nil { return nil, errors.Wrap(err, "startPod") } resJ := jsonutils.Marshal(resp) res := resJ.(*jsonutils.JSONDict) if !s.manager.host.IsSchedulerNumaAllocateEnabled() { res.Set("cpu_numa_pin", jsonutils.Marshal(s.Desc.CpuNumaPin)) } return res, nil }, nil, s.manager.GuestStartWorker) return nil, nil } func (s *sPodGuestInstance) HandleStop(ctx context.Context, timeout int64) error { hostutils.DelayTask(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) { err := s.stopPod(ctx, timeout) if err != nil { return nil, errors.Wrap(err, "stopPod") } return nil, nil }, nil) return nil } func (s *sPodGuestInstance) getCreateParams() (jsonutils.JSONObject, error) { createParamsStr, ok := s.GetDesc().Metadata[computeapi.VM_METADATA_CREATE_PARAMS] if !ok { return nil, errors.Errorf("not found %s in metadata", computeapi.VM_METADATA_CREATE_PARAMS) } return jsonutils.ParseString(createParamsStr) } func (s *sPodGuestInstance) getPostStopCleanupConfig() (*computeapi.PodPostStopCleanupConfig, error) { data, ok := s.GetDesc().Metadata[computeapi.POD_METADATA_POST_STOP_CLEANUP_CONFIG] if !ok { return nil, nil } jsonData, err := jsonutils.ParseString(data) if err != nil { return nil, errors.Wrapf(err, "parse to json") } input := new(computeapi.PodPostStopCleanupConfig) if err := jsonData.Unmarshal(input); err != nil { return nil, errors.Wrapf(err, "unmarshal to pod post stop cleanup config") } return input, nil } func (s *sPodGuestInstance) getPodCreateParams() (*computeapi.PodCreateInput, error) { createParams, err := s.getCreateParams() if err != nil { return nil, errors.Wrapf(err, "getCreateParams") } input := new(computeapi.PodCreateInput) if err := createParams.Unmarshal(input, "pod"); err != nil { return nil, errors.Wrapf(err, "unmarshal to pod creation input") } return input, nil } func (s *sPodGuestInstance) getPodLogDir() string { return filepath.Join(s.HomeDir(), "logs") } func (s *sPodGuestInstance) GetPodLogDir() string { return s.getPodLogDir() } func (s *sPodGuestInstance) getContainerLogPath(ctrId string) string { return filepath.Join(fmt.Sprintf("%s.log", ctrId)) } func (s *sPodGuestInstance) GetContainerLogPath(ctrId string) string { return s.getContainerLogPath(ctrId) } func (s *sPodGuestInstance) ListContainerCriIds() map[string]string { out := make(map[string]string, len(s.containers)) for ctrId, c := range s.containers { out[ctrId] = c.CRIId } return out } func (s *sPodGuestInstance) getShmDir() string { return filepath.Join(s.HomeDir(), "shm") } func (s *sPodGuestInstance) getContainerShmDir(containerName string) string { return filepath.Join(s.getShmDir(), fmt.Sprintf("%s-shm", containerName)) } func (s *sPodGuestInstance) GetDisks() []*desc.SGuestDisk { return s.GetDesc().Disks } func (s *sPodGuestInstance) ConvertRootFsToVolumeMount(rootFs *hostapi.ContainerRootfs) *hostapi.ContainerVolumeMount { return &hostapi.ContainerVolumeMount{ Type: rootFs.Type, Disk: rootFs.Disk, } } func (s *sPodGuestInstance) mountRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error { drv := volume_mount.GetDriver(rootFs.Type) vol := s.ConvertRootFsToVolumeMount(rootFs) if err := drv.Mount(s, ctrId, vol); err != nil { return errors.Wrapf(err, "mount root fs %s, ctrId %s", jsonutils.Marshal(rootFs), ctrId) } return nil } func (s *sPodGuestInstance) clearContainerRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error { if rootFs == nil { return nil } drv := volume_mount.GetDriver(rootFs.Type) vol := s.ConvertRootFsToVolumeMount(rootFs) hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol) if err != nil { return errors.Wrapf(err, "get runtime mount path") } if !rootFs.Persistent { if err := volume_mount.RemoveDir(hostPath); err != nil { return errors.Wrapf(err, "remove %q", hostPath) } log.Infof("clear rootfs of container %s/%s: %q", s.GetName(), ctrId, hostPath) } return nil } func (s *sPodGuestInstance) umountRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error { if rootFs == nil { return nil } drv := volume_mount.GetDriver(rootFs.Type).(volume_mount.IConnectedVolumeMount) vol := s.ConvertRootFsToVolumeMount(rootFs) if err := s.clearContainerRootFs(ctrId, rootFs); err != nil { return errors.Wrapf(err, "clear rootfs of container %s", ctrId) } if err := drv.UnmountWithoutDisconnect(s, ctrId, vol); err != nil { return errors.Wrapf(err, "unmount root fs %s, ctrId %s", jsonutils.Marshal(rootFs), ctrId) } return nil } func (s *sPodGuestInstance) GetRootFsMountPath(ctrId string) (string, error) { ctr := s.GetContainerById(ctrId) if ctr == nil { return "", errors.Wrapf(httperrors.ErrNotFound, "not found container %s", ctrId) } rootFs := ctr.Spec.Rootfs if rootFs == nil { return "", errors.Wrapf(httperrors.ErrNotFound, "not found root fs %s", ctrId) } vol := s.ConvertRootFsToVolumeMount(rootFs) drv := volume_mount.GetDriver(vol.Type) hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol) if err != nil { return "", errors.Wrapf(err, "get runtime mount host path %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } return hostPath, nil } func (s *sPodGuestInstance) mountPodVolumes() error { for _, ctr := range s.GetDesc().Containers { if ctr.Spec.Rootfs == nil { continue } if err := s.mountRootFs(ctr.Id, ctr.Spec.Rootfs); err != nil { return errors.Wrapf(err, "mount root fs %s, ctrId %s", jsonutils.Marshal(ctr.Spec.Rootfs), ctr.Id) } } for ctrId, vols := range s.getContainerVolumeMounts() { for _, vol := range vols { drv := volume_mount.GetDriver(vol.Type) if err := drv.Mount(s, ctrId, vol); err != nil { return errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } if vol.FsUser != nil || vol.FsGroup != nil { // change mountpoint owner if err := volume_mount.ChangeDirOwner(s, drv, ctrId, vol); err != nil { return errors.Wrapf(err, "change dir owner") } } } } return nil } func (s *sPodGuestInstance) umountPodVolumes() error { disConnectFuncs := make([]func() error, 0) for ctrId, vols := range s.getContainerVolumeMounts() { for _, vol := range vols { drv := volume_mount.GetDriver(vol.Type) connectedDrv, ok := drv.(volume_mount.IConnectedVolumeMount) if !ok { if err := drv.Unmount(s, ctrId, vol); err != nil { return errors.Wrapf(err, "Unmount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } } else { if err := connectedDrv.UnmountWithoutDisconnect(s, ctrId, vol); err != nil { return errors.Wrapf(err, "UnmountWithoutDisconnect volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } tmpCtrId := ctrId tmpVol := vol disConnectFuncs = append(disConnectFuncs, func() error { if err := connectedDrv.Disconnect(s, tmpCtrId, tmpVol); err != nil { return errors.Wrapf(err, "Disconnect volume %s, ctrId %s", jsonutils.Marshal(tmpVol), tmpCtrId) } return nil }) } } } for _, ctr := range s.GetDesc().Containers { if ctr.Spec.Rootfs == nil { continue } if err := s.umountRootFs(ctr.Id, ctr.Spec.Rootfs); err != nil { return errors.Wrapf(err, "umount root fs %s, ctrId %s", jsonutils.Marshal(ctr.Spec.Rootfs), ctr.Id) } } for _, disConnectFunc := range disConnectFuncs { if err := disConnectFunc(); err != nil { return errors.Wrapf(err, "disconnect volume") } } return nil } func (s *sPodGuestInstance) GetContainers() []*hostapi.ContainerDesc { return s.GetDesc().Containers } func (s *sPodGuestInstance) GetPodContainerCriIds() []string { criids := make([]string, 0) for i := range s.containers { criids = append(criids, s.containers[i].CRIId) } return criids } func (s *sPodGuestInstance) HasContainerNvidiaGpu() bool { for i := range s.Desc.IsolatedDevices { if utils.IsInStringArray(s.Desc.IsolatedDevices[i].DevType, computeapi.NVIDIA_GPU_TYPES) { return true } } return false } func (s *sPodGuestInstance) GetContainerById(ctrId string) *hostapi.ContainerDesc { ctrs := s.GetContainers() for i := range ctrs { ctr := ctrs[i] if ctr.Id == ctrId { return ctr } } return nil } func (s *sPodGuestInstance) SaveContainerDesc(ctr *hostapi.ContainerDesc) error { ctrs := s.GetContainers() for i := range ctrs { tmp := ctrs[i] if tmp.Id == ctr.Id { ctrs[i] = ctr } } s.GetDesc().Containers = ctrs return SaveDesc(s, s.GetDesc()) } func (s *sPodGuestInstance) getContainerVolumeMounts() map[string][]*hostapi.ContainerVolumeMount { result := make(map[string][]*hostapi.ContainerVolumeMount, 0) for _, ctr := range s.GetDesc().Containers { mnts, ok := result[ctr.Id] if !ok { mnts = make([]*hostapi.ContainerVolumeMount, 0) } for _, vol := range ctr.Spec.VolumeMounts { tmp := vol mnts = append(mnts, tmp) } result[ctr.Id] = mnts } return result } func (s *sPodGuestInstance) getContainerVolumeMountsByDiskId(ctrId, diskId string) []*hostapi.ContainerVolumeMount { ctrVols := s.getContainerVolumeMounts() vols, ok := ctrVols[ctrId] if !ok { return nil } volList := make([]*hostapi.ContainerVolumeMount, 0) for _, vol := range vols { if vol.Disk != nil { if vol.Disk.Id == diskId { tmpVol := vol volList = append(volList, tmpVol) } } } return volList } func (s *sPodGuestInstance) GetVolumesDir() string { return filepath.Join(s.HomeDir(), "volumes") } func (s *sPodGuestInstance) GetVolumesOverlayDir() string { return filepath.Join(s.GetVolumesDir(), "_overlay_") } func (s *sPodGuestInstance) GetDiskMountPoint(disk storageman.IDisk) string { return s.GetDiskMountPointById(disk.GetId()) } func (s *sPodGuestInstance) GetDiskMountPointById(diskId string) string { return filepath.Join(s.GetVolumesDir(), diskId) } func (s *sPodGuestInstance) getPodPrivilegedMode(input *computeapi.PodCreateInput) bool { for _, ctr := range input.Containers { if ctr.Privileged { return true } } return false } func (s *sPodGuestInstance) getOtherPods() []*sPodGuestInstance { man := s.manager otherPods := make([]*sPodGuestInstance, 0) man.Servers.Range(func(id, value any) bool { if id == s.Id { return true } ins := value.(GuestRuntimeInstance) pod, ok := ins.(*sPodGuestInstance) if !ok { return true } otherPods = append(otherPods, pod) return true }) return otherPods } func PodCgroupParent() string { return "/cloudpods" } type localDirtyPodStartTask struct { ctx context.Context userCred mcclient.TokenCredential pod *sPodGuestInstance } func newLocalDirtyPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localDirtyPodStartTask { return &localDirtyPodStartTask{ ctx: ctx, userCred: userCred, pod: pod, } } func (t *localDirtyPodStartTask) Run() { /*if t.pod.isPodDirtyShutdown() { log.Infof("start dirty pod locally (%s/%s)", t.pod.Id, t.pod.GetName()) if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil { log.Errorf("start dirty pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error()) } }*/ for _, ctr := range t.pod.GetContainers() { if t.pod.isContainerDirtyShutdown(ctr.Id) { if !t.pod.IsRunning() { log.Infof("start dirty pod locally (%s/%s)", t.pod.Id, t.pod.GetName()) if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil { log.Errorf("start dirty pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error()) } } log.Infof("start dirty container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name) if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil { log.Errorf("start dirty container %s err: %s", ctr.Id, err.Error()) } } } t.pod.SyncStatus("sync status after dirty pod start locally", "") } func (t *localDirtyPodStartTask) Dump() string { return fmt.Sprintf("pod start task %s/%s", t.pod.GetId(), t.pod.GetName()) } func (s *sPodGuestInstance) StartLocalDirtyPod(ctx context.Context, userCred mcclient.TokenCredential) error { s.manager.GuestStartWorker.Run(newLocalDirtyPodStartTask(ctx, userCred, s), nil, nil) return nil } func (s *sPodGuestInstance) ShouldRestartPodOnCrash() bool { if len(s.GetContainers()) <= 1 { return true } return false } func (s *sPodGuestInstance) IsPrimaryContainer(ctrId string) bool { ctr := s.GetContainerById(ctrId) if ctr == nil { return false } if len(s.containers) == 1 { return true } return ctr.Spec.Primary } func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) { s.startPodLock.Lock() defer s.startPodLock.Unlock() retries := 3 sec := 5 * time.Second var err error var resp *computeapi.PodStartResponse for i := 1; i <= retries; i++ { resp, err = s._startPod(ctx, userCred) if err == nil { return resp, nil } log.Errorf("start pod %s/%s error with %d times: %v", s.GetId(), s.GetName(), i, err) time.Sleep(sec * time.Duration(i)) } return resp, err } func (s *sPodGuestInstance) namespacesForPod(input *computeapi.PodCreateInput) *runtimeapi.NamespaceOption { opt := &runtimeapi.NamespaceOption{ Ipc: runtimeapi.NamespaceMode_POD, Network: runtimeapi.NamespaceMode_POD, Pid: runtimeapi.NamespaceMode_CONTAINER, } if input.HostIPC { opt.Ipc = runtimeapi.NamespaceMode_NODE } return opt } func (s *sPodGuestInstance) updateGuestDesc() error { s.Desc = new(desc.SGuestDesc) err := jsonutils.Marshal(s.SourceDesc).Unmarshal(s.Desc) if err != nil { return errors.Wrap(err, "unmarshal source desc") } return s.allocateCpuNumaPin() } func (s *sPodGuestInstance) _startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) { podInput, err := s.getPodCreateParams() if err != nil { return nil, errors.Wrap(err, "getPodCreateParams") } if err := s.updateGuestDesc(); err != nil { return nil, errors.Wrap(err, "updateGuestDesc") } if err := s.mountPodVolumes(); err != nil { return nil, errors.Wrap(err, "mountPodVolumes") } if err := s.ensurePodRemoved(ctx, 0); err != nil { log.Warningf("ensure pod removed before starting %s: %v", s.GetId(), err) } podCfg := &runtimeapi.PodSandboxConfig{ Metadata: &runtimeapi.PodSandboxMetadata{ Name: s.GetDesc().Name, Uid: s.GetId(), Namespace: s.GetDesc().TenantId, Attempt: 1, }, Hostname: s.GetDesc().Hostname, LogDirectory: s.getPodLogDir(), DnsConfig: nil, PortMappings: nil, Labels: map[string]string{ runtime.PodUIDLabel: s.GetId(), }, Annotations: nil, Linux: &runtimeapi.LinuxPodSandboxConfig{ CgroupParent: PodCgroupParent(), SecurityContext: &runtimeapi.LinuxSandboxSecurityContext{ NamespaceOptions: s.namespacesForPod(podInput), SelinuxOptions: nil, RunAsUser: nil, RunAsGroup: nil, ReadonlyRootfs: false, SupplementalGroups: nil, Privileged: s.getPodPrivilegedMode(podInput), Seccomp: &runtimeapi.SecurityProfile{ ProfileType: runtimeapi.SecurityProfile_Unconfined, }, Apparmor: &runtimeapi.SecurityProfile{ ProfileType: runtimeapi.SecurityProfile_Unconfined, }, SeccompProfilePath: "", }, Sysctls: nil, }, Windows: nil, } // inject pod security context podSec := podInput.SecurityContext if podSec != nil { /*podCfg.Linux.Sysctls = map[string]string{ "net.ipv4.ip_unprivileged_port_start": "80", }*/ if podSec.RunAsUser != nil { podCfg.Linux.SecurityContext.RunAsUser = &runtimeapi.Int64Value{ Value: *podSec.RunAsUser, } } if podSec.RunAsGroup != nil { podCfg.Linux.SecurityContext.RunAsGroup = &runtimeapi.Int64Value{ Value: *podSec.RunAsGroup, } } } if options.HostOptions.EnableContainerCniPortmap { metaPms, err := s.GetPortMappings() if err != nil { return nil, errors.Wrap(err, "GetPortMappings") } if len(metaPms) != 0 { pms := make([]*runtimeapi.PortMapping, len(metaPms)) for idx := range metaPms { pm := metaPms[idx] proto := runtimeapi.Protocol_TCP switch pm.Protocol { case computeapi.PodPortMappingProtocolTCP: proto = runtimeapi.Protocol_TCP case computeapi.PodPortMappingProtocolUDP: proto = runtimeapi.Protocol_UDP } pms[idx] = &runtimeapi.PortMapping{ Protocol: proto, ContainerPort: int32(pm.Port), HostPort: int32(*pm.HostPort), HostIp: pm.HostIp, } } podCfg.PortMappings = pms } } criId, err := s.getCRI().RunPod(ctx, podCfg, "") if err != nil { return nil, errors.Wrap(err, "cri.RunPod") } if err := s.setCRIInfo(ctx, userCred, criId, podCfg); err != nil { return nil, errors.Wrap(err, "setCRIId") } // set pod cgroup resources if err := s.setPodCgroupResources(criId, s.GetDesc().Mem, s.GetDesc().Cpu); err != nil { return nil, errors.Wrapf(err, "set pod %s cgroup memMB %d, cpu %d", criId, s.GetDesc().Mem, s.GetDesc().Cpu) } s.getProbeManager().AddPod(s) if err := s.startStat.CreatePodFile(); err != nil { return nil, errors.Wrap(err, "startStat.CreatePodFile") } return &computeapi.PodStartResponse{ CRIId: criId, IsRunning: false, }, nil } func (s *sPodGuestInstance) setPodCgroupResources(criId string, memMB int64, cpuCnt int64) error { if err := s.getCGUtil().SetMemoryLimitBytes(criId, memMB*1024*1024); err != nil { return errors.Wrap(err, "set cgroup memory limit") } if err := s.getCGUtil().SetCPUCfs(criId, cpuCnt*s.getDefaultCPUPeriod(), s.getDefaultCPUPeriod()); err != nil { return errors.Wrap(err, "set cgroup cfs") } return nil } func (s *sPodGuestInstance) ensurePodRemoved(ctx context.Context, timeout int64) error { if timeout == 0 { timeout = 15 } ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) defer cancel() /*if err := s.getCRI().StopPod(ctx, &runtimeapi.StopPodSandboxRequest{ PodSandboxId: s.GetCRIId(), }); err != nil { return errors.Wrapf(err, "stop cri pod: %s", s.GetCRIId()) }*/ criId := s.GetCRIId() if criId != "" { if err := s.getCRI().RemovePod(ctx, s.GetCRIId()); err != nil { return errors.Wrapf(err, "remove cri pod: %s", s.GetCRIId()) } } p, _ := s.getPod(ctx) if p != nil { if err := s.getCRI().RemovePod(ctx, p.GetId()); err != nil { return errors.Wrapf(err, "remove cri pod: %s", p.GetId()) } } s.getProbeManager().RemovePod(s) if err := s.startStat.RemovePodFile(); err != nil { return errors.Wrap(err, "startStat.RemovePodFile") } return nil } func (s *sPodGuestInstance) stopPod(ctx context.Context, timeout int64) error { if err := s.umountPodVolumes(); err != nil { return errors.Wrapf(err, "umount pod volumes") } if timeout == 0 { timeout = 15 } if err := s.ensurePodRemoved(ctx, timeout); err != nil { return err } ReleaseGuestCpuset(s.manager, s) if err := s.postStopCleanup(ctx); err != nil { return errors.Wrapf(err, "post stop cleanup") } return nil } func (s *sPodGuestInstance) postStopCleanup(ctx context.Context) error { config, err := s.getPostStopCleanupConfig() if err != nil { return errors.Wrapf(err, "get post stop cleanup config") } if config == nil { return nil } for _, dir := range config.Dirs { if err := os.RemoveAll(dir); err != nil { return errors.Wrapf(err, "remove dir %s", dir) } } return nil } func (s *sPodGuestInstance) LoadDesc() error { if err := LoadDesc(s); err != nil { return errors.Wrap(err, "LoadDesc") } if err := s.loadContainers(); err != nil { return errors.Wrap(err, "loadContainers") } return nil } func (s *sPodGuestInstance) loadContainers() error { s.containers = make(map[string]*sContainer) ctrFile := s.getContainersFilePath() if !fileutils2.Exists(ctrFile) { log.Warningf("pod %s containers file %s doesn't exist", s.Id, ctrFile) return nil } ctrStr, err := ioutil.ReadFile(ctrFile) if err != nil { return errors.Wrapf(err, "read %s", ctrFile) } obj, err := jsonutils.Parse(ctrStr) if err != nil { return errors.Wrapf(err, "jsonutils.Parse %s", ctrStr) } ctrs := make(map[string]*sContainer) if err := obj.Unmarshal(ctrs); err != nil { return errors.Wrapf(err, "unmarshal %s to container map", obj.String()) } s.containers = ctrs return nil } func (s *sPodGuestInstance) PostLoad(m *SGuestManager) error { return LoadGuestCpuset(m, s) } func (s *sPodGuestInstance) SyncConfig(ctx context.Context, guestDesc *desc.SGuestDesc, fwOnly, setUefiBootOrder bool) (jsonutils.JSONObject, error) { if err := SaveDesc(s, guestDesc); err != nil { return nil, errors.Wrap(err, "SaveDesc") } // update guest live desc, don't be here update cpu and mem // cpu and memory should update from SGuestHotplugCpuMemTask s.UpdateLiveDesc(guestDesc) // keep origin cpu numa pin cpuNumaPin := s.Desc.CpuNumaPin s.Desc.SGuestHardwareDesc = guestDesc.SGuestHardwareDesc s.Desc.SGuestContainerDesc = guestDesc.SGuestContainerDesc s.Desc.CpuNumaPin = cpuNumaPin if err := SaveLiveDesc(s, s.Desc); err != nil { return nil, errors.Wrap(err, "SaveLiveDesc") } return nil, nil } func (s *sPodGuestInstance) getContainerMeta(id string) *sContainer { return s.containers[id] } func (s *sPodGuestInstance) getContainerCRIId(ctrId string) (string, error) { ctr := s.getContainerMeta(ctrId) if ctr == nil { return "", errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId) } return ctr.CRIId, nil } func (s *sPodGuestInstance) GetContainerByCRIId(criId string) (*hostapi.ContainerDesc, error) { for _, ctr := range s.containers { if ctr.CRIId == criId { desc := s.GetContainerById(ctr.Id) if desc == nil { return nil, errors.Wrapf(errors.ErrNotFound, "Not found container desc by CRIId %s", criId) } return desc, nil } } return nil, errors.Wrapf(errors.ErrNotFound, "Not found container by CRIId %s", criId) } func (s *sPodGuestInstance) StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) { ctr := s.GetContainerById(ctrId) if ctr == nil { return nil, errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId) } input := &hostapi.ContainerCreateInput{ Name: ctr.Name, GuestId: s.GetId(), Spec: ctr.Spec, RestartCount: ctr.RestartCount + 1, } ret, err := s.StartContainer(ctx, userCred, ctrId, input) if err != nil { return nil, errors.Wrap(err, "start container") } return ret, nil } func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) { _, hasCtr := s.containers[ctrId] needRecreate := false if hasCtr { status, _, err := s.getContainerStatus(ctx, ctrId) if err != nil { if IsContainerNotFoundError(err) { needRecreate = true } else { return nil, errors.Wrap(err, "get container status") } } else { if computeapi.ContainerExitedStatus.Has(status) { needRecreate = true } else if status != computeapi.CONTAINER_STATUS_CREATED { return nil, errors.Errorf("can't start container when status is %s", status) } } } if err := s.clearContainerRootFs(ctrId, input.Spec.Rootfs); err != nil { return nil, errors.Wrapf(err, "clear container rootfs %s before starting", jsonutils.Marshal(input.Spec.Rootfs)) } if !hasCtr || needRecreate { log.Infof("recreate container %s before starting. hasCtr: %v, needRecreate: %v", ctrId, hasCtr, needRecreate) // delete and recreate the container before starting if hasCtr { if _, err := s.DeleteContainer(ctx, userCred, ctrId); err != nil { return nil, errors.Wrap(err, "delete container before starting") } } if _, err := s.CreateContainer(ctx, userCred, ctrId, input); err != nil { return nil, errors.Wrap(err, "recreate container before starting") } } criId, err := s.getContainerCRIId(ctrId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } if err := s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_RUNNING); err != nil { log.Warningf("set container %s(%s) expected status to running: %v", criId, ctrId, err) } if err := s.getCRI().StartContainer(ctx, criId); err != nil { return nil, errors.Wrap(err, "CRI.StartContainer") } // 如果是 cgroup v2,设备规则已经通过 containerd API 在 CreateContainer 时设置,跳过 eBPF 方式 // 如果是 cgroup v1,继续使用原有的 eBPF 方式 isV2, err := podutil.DetectCgroupVersion() if err != nil { log.Warningf("[StartContainer] Failed to detect cgroup version: %v, using eBPF device allow", err) isV2 = false } if !isV2 { // cgroup v1 场景,使用原有的 eBPF 方式 if err := s.setContainerCgroupDevicesAllow(criId, input.Spec.CgroupDevicesAllow, input.Spec.Devices); err != nil { return nil, errors.Wrap(err, "set cgroup devices allow") } } else { log.Debugf("[StartContainer] cgroup v2 detected, skipping eBPF device allow (devices already set via containerd API)") } if input.Spec.CgroupPidsMax > 0 { if err := s.getCGUtil().SetPidsMax(criId, input.Spec.CgroupPidsMax); err != nil { return nil, errors.Wrap(err, "set cgroup pids.max") } } if err := s.doContainerStartPostLifecycle(ctx, criId, input); err != nil { return nil, errors.Wrap(err, "do container lifecycle") } if err := s.startStat.CreateContainerFile(ctrId); err != nil { return nil, errors.Wrapf(err, "create container startup stat file %s", ctrId) } if input.Spec.ResourcesLimit != nil { if err := s.setContainerResourcesLimit(criId, input.Spec.ResourcesLimit); err != nil { return nil, errors.Wrap(err, "set container resources limit") } } return nil, nil } func (s *sPodGuestInstance) allocateCpuNumaPin() error { if len(s.Desc.CpuNumaPin) != 0 || len(s.Desc.VcpuPin) != 0 { return nil } if scpuset, ok := s.Desc.Metadata[computeapi.VM_METADATA_CGROUP_CPUSET]; ok { cpusetJson, err := jsonutils.ParseString(scpuset) if err != nil { log.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) return errors.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) } input := new(computeapi.ServerCPUSetInput) err = cpusetJson.Unmarshal(input) if err != nil { log.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) return errors.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) } cpus := input.CPUS s.Desc.VcpuPin = []desc.SCpuPin{ { Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1), Pcpus: cpuset.NewCPUSet(cpus...).String(), }, } return nil } var cpus = make([]int, 0) var preferNumaNodes = make([]int8, 0) for i := range s.Desc.IsolatedDevices { if s.Desc.IsolatedDevices[i].NumaNode >= 0 { preferNumaNodes = append(preferNumaNodes, s.Desc.IsolatedDevices[i].NumaNode) break } } log.Infof("guest %s prefer nodes %v", s.Id, preferNumaNodes) nodeNumaCpus, err := s.manager.cpuSet.AllocCpuset(int(s.Desc.Cpu), s.Desc.Mem*1024, preferNumaNodes, s.GetId()) if err != nil { return err } for _, numaCpus := range nodeNumaCpus { cpus = append(cpus, numaCpus.Cpuset...) } if !s.manager.hostagentNumaAllocate { s.Desc.VcpuPin = []desc.SCpuPin{ { Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1), Pcpus: cpuset.NewCPUSet(cpus...).String(), }, } } else { var cpuNumaPin = make([]*desc.SCpuNumaPin, 0) for nodeId, numaCpus := range nodeNumaCpus { if s.manager.hostagentNumaAllocate { unodeId := uint16(nodeId) vcpuPin := make([]desc.SVCpuPin, len(numaCpus.Cpuset)) for i := range numaCpus.Cpuset { vcpuPin[i].Pcpu = numaCpus.Cpuset[i] if i < int(s.Desc.Cpu) { vcpuPin[i].Vcpu = i } else { vcpuPin[i].Vcpu = -1 } } memPin := &desc.SCpuNumaPin{ SizeMB: numaCpus.MemSizeKB / 1024, // MB NodeId: &unodeId, VcpuPin: vcpuPin, Unregular: numaCpus.Unregular, } cpuNumaPin = append(cpuNumaPin, memPin) } } s.Desc.CpuNumaPin = cpuNumaPin } return SaveLiveDesc(s, s.Desc) } func (s *sPodGuestInstance) doContainerStartPostLifecycle(ctx context.Context, criId string, input *hostapi.ContainerCreateInput) error { ls := input.Spec.Lifecyle if ls == nil { return nil } if ls.PostStart == nil { return nil } drv := lifecycle.GetDriver(ls.PostStart.Type) if err := drv.Run(ctx, ls.PostStart, s.getCRI(), criId); err != nil { return errors.Wrapf(err, "run %s", ls.PostStart.Type) } return nil } func (s *sPodGuestInstance) StopAll(ctx context.Context) error { ctrs := s.GetContainers() userCred := hostutils.GetComputeSession(ctx).GetToken() errs := []error{} for _, ctr := range ctrs { if _, err := s.StopContainer(ctx, userCred, ctr.Id, &hostapi.ContainerStopInput{}); err != nil { errs = append(errs, errors.Wrapf(err, "failed to stop container %s", ctr.Id)) } } err := errors.NewAggregate(errs) if err != nil { return err } if err := s.stopPod(ctx, 5); err != nil { return errors.Wrapf(err, "stop pod %s", s.GetName()) } return nil } func (s *sPodGuestInstance) StopContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerStopInput) (jsonutils.JSONObject, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil { if errors.Cause(err) == errors.ErrNotFound { // cri id not found, should already stopped return nil, nil } return nil, errors.Wrap(err, "get container cri id") } var timeout int64 = 0 s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_EXITED) if input.Timeout != 0 { timeout = input.Timeout } shmSizeMB := input.ShmSizeMB if shmSizeMB > 64 { name := input.ContainerName if name == "" { return nil, errors.Wrapf(errors.ErrNotFound, "not found container_name from input: %s", jsonutils.Marshal(input)) } if err := s.unmountDevShm(name); err != nil { return nil, errors.Wrapf(err, "unmount shm %s", name) } } if err := s.getCRI().StopContainer(ctx, criId, timeout, true, input.Force); err != nil { if !IsContainerNotFoundError(err) { return nil, errors.Wrap(err, "CRI.StopContainer") } else { log.Warningf("CRI.StopContainer %s not found", criId) } } if input.Force { // FIXME: Sleep 2s 等待 pleg.PodLifecycleEventGenerator 刷新 // 后期可以添加主动通知刷新 time.Sleep(2 * time.Second) } if err := s.startStat.RemoveContainerFile(ctrId); err != nil { return nil, errors.Wrap(err, "startStat.RemoveContainerFile") } return nil, nil } func (s *sPodGuestInstance) GetCRIId() string { return s.GetSourceDesc().Metadata[computeapi.POD_METADATA_CRI_ID] } func (s *sPodGuestInstance) convertToPodMetadataPortMappings(pms []*runtimeapi.PortMapping) []*computeapi.PodMetadataPortMapping { ret := make([]*computeapi.PodMetadataPortMapping, len(pms)) for idx := range pms { pm := pms[idx] var proto computeapi.PodPortMappingProtocol = computeapi.PodPortMappingProtocolTCP if pm.Protocol == runtimeapi.Protocol_UDP { proto = computeapi.PodPortMappingProtocolUDP } ret[idx] = &computeapi.PodMetadataPortMapping{ Protocol: proto, ContainerPort: pm.ContainerPort, HostPort: pm.HostPort, HostIp: pm.HostIp, } } return ret } func (s *sPodGuestInstance) setPortMappings(ctx context.Context, userCred mcclient.TokenCredential, pms []*computeapi.PodMetadataPortMapping) error { pmStr := jsonutils.Marshal(pms).String() s.Desc.Metadata[computeapi.POD_METADATA_PORT_MAPPINGS] = pmStr session := auth.GetSession(ctx, userCred, options.HostOptions.Region) if _, err := computemod.Servers.SetMetadata(session, s.GetId(), jsonutils.Marshal(map[string]string{ computeapi.POD_METADATA_PORT_MAPPINGS: pmStr, })); err != nil { return errors.Wrapf(err, "set cri_id of pod %s", s.GetId()) } return SaveDesc(s, s.Desc) } func (s *sPodGuestInstance) setCRIInfo(ctx context.Context, userCred mcclient.TokenCredential, criId string, cfg *runtimeapi.PodSandboxConfig) error { s.Desc.Metadata[computeapi.POD_METADATA_CRI_ID] = criId cfgStr := jsonutils.Marshal(cfg).String() s.Desc.Metadata[computeapi.POD_METADATA_CRI_CONFIG] = cfgStr session := auth.GetSession(ctx, userCred, options.HostOptions.Region) if _, err := computemod.Servers.SetMetadata(session, s.GetId(), jsonutils.Marshal(map[string]string{ computeapi.POD_METADATA_CRI_ID: criId, computeapi.POD_METADATA_CRI_CONFIG: cfgStr, })); err != nil { return errors.Wrapf(err, "set cri_id of pod %s", s.GetId()) } return SaveDesc(s, s.Desc) } func (s *sPodGuestInstance) setContainerCRIInfo(ctx context.Context, userCred mcclient.TokenCredential, ctrId, criId string) error { session := auth.GetSession(ctx, userCred, options.HostOptions.Region) if _, err := computemod.Containers.SetMetadata(session, ctrId, jsonutils.Marshal(map[string]string{ computeapi.CONTAINER_METADATA_CRI_ID: criId, })); err != nil { return errors.Wrapf(err, "set cri_id of container %s", ctrId) } return nil } func (s *sPodGuestInstance) getPodSandboxConfig() (*runtimeapi.PodSandboxConfig, error) { cfgStr := s.GetSourceDesc().Metadata[computeapi.POD_METADATA_CRI_CONFIG] obj, err := jsonutils.ParseString(cfgStr) if err != nil { return nil, errors.Wrapf(err, "ParseString to json object: %s", cfgStr) } podCfg := new(runtimeapi.PodSandboxConfig) if err := obj.Unmarshal(podCfg); err != nil { return nil, errors.Wrap(err, "Unmarshal to PodSandboxConfig") } return podCfg, nil } func (s *sPodGuestInstance) GetPortMappings() (computeapi.GuestPortMappings, error) { srcDesc := s.GetSourceDesc() nics := srcDesc.Nics pms := make([]*computeapi.GuestPortMapping, 0) for _, nic := range nics { for _, pm := range nic.PortMappings { tmpPm := pm pms = append(pms, tmpPm) } } return pms, nil } func (s *sPodGuestInstance) saveContainer(id string, criId string) error { s.saveContainerLock.Lock() defer s.saveContainerLock.Unlock() _, ok := s.containers[id] if ok { return errors.Errorf("container %s already exists", criId) } ctr := newContainer(id) ctr.CRIId = criId s.containers[id] = ctr if err := s.saveContainersFile(s.containers); err != nil { return errors.Wrap(err, "saveContainersFile") } return nil } func (s *sPodGuestInstance) saveContainersFile(containers map[string]*sContainer) error { content := jsonutils.Marshal(containers).String() if err := fileutils2.FilePutContents(s.getContainersFilePath(), content, false); err != nil { return errors.Wrapf(err, "put content %s to containers file", content) } return nil } func (s *sPodGuestInstance) getContainersFilePath() string { return path.Join(s.HomeDir(), "containers") } func (s *sPodGuestInstance) CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) { // always pull image for checking imgInput := &hostapi.ContainerPullImageInput{ Image: input.Spec.Image, PullPolicy: input.Spec.ImagePullPolicy, } if input.Spec.ImageCredentialToken != "" { tokenJson, err := base64.StdEncoding.DecodeString(input.Spec.ImageCredentialToken) if err != nil { return nil, errors.Wrapf(err, "base64 decode image credential token %s", input.Spec.ImageCredentialToken) } authObj, err := jsonutils.Parse(tokenJson) if err != nil { return nil, errors.Wrapf(err, "parse image credential token %s", input.Spec.ImageCredentialToken) } imgAuth := new(apis.ContainerPullImageAuthConfig) if err := authObj.Unmarshal(imgAuth); err != nil { return nil, errors.Wrapf(err, "unmarshal image credential token: %s", authObj) } imgInput.Auth = imgAuth } if _, err := s.PullImage(ctx, userCred, id, imgInput); err != nil { return nil, errors.Wrapf(err, "pull image %s", input.Spec.Image) } pod, err := s.getPod(ctx) if err != nil { return nil, errors.Wrap(err, "get pod") } ctrCriId, err := s.createContainer(ctx, userCred, pod, id, input) if err != nil { return nil, errors.Wrap(err, "CRI.CreateContainer") } // 如果是 cgroup v2,通过 containerd API 更新 container spec 中的 devices isV2, err := podutil.DetectCgroupVersion() if err != nil { log.Warningf("[CreateContainer] Failed to detect cgroup version: %v, skipping containerd device update", err) } else if isV2 { // 将设备规则转换为 specs.LinuxDeviceCgroup specDevices, err := podutil.ConvertDeviceRulesToSpecsDevices(input.Spec.CgroupDevicesAllow) if err != nil { log.Warningf("[CreateContainer] Failed to convert device rules to specs devices: %v, skipping containerd device update", err) } else if len(specDevices) > 0 { if err := s.updateContainerDevicesViaContainerd(ctx, ctrCriId, specDevices); err != nil { log.Warningf("[CreateContainer] Failed to update container devices via containerd API: %v (container created but devices may not be accessible)", err) // 不返回错误,因为容器已经创建成功 } } } if err := s.setContainerCRIInfo(ctx, userCred, id, ctrCriId); err != nil { return nil, errors.Wrap(err, "setContainerCRIInfo") } return nil, nil } func (s *sPodGuestInstance) getLxcfsMounts() []*runtimeapi.Mount { // lxcfsPath := "/var/lib/lxc/lxcfs" lxcfsPath := options.HostOptions.LxcfsPath const ( procCpuinfo = "/proc/cpuinfo" procDiskstats = "/proc/diskstats" procMeminfo = "/proc/meminfo" procStat = "/proc/stat" procSwaps = "/proc/swaps" procUptime = "/proc/uptime" ) newLxcfsMount := func(fp string) *runtimeapi.Mount { return &runtimeapi.Mount{ ContainerPath: fp, HostPath: fmt.Sprintf("%s%s", lxcfsPath, fp), } } return []*runtimeapi.Mount{ newLxcfsMount(procUptime), newLxcfsMount(procMeminfo), newLxcfsMount(procStat), newLxcfsMount(procCpuinfo), newLxcfsMount(procSwaps), newLxcfsMount(procDiskstats), } } func (s *sPodGuestInstance) getContainerMounts(ctrId string, input *hostapi.ContainerCreateInput) ([]*runtimeapi.Mount, error) { inputMounts := input.Spec.VolumeMounts if len(inputMounts) == 0 { return make([]*runtimeapi.Mount, 0), nil } mounts := make([]*runtimeapi.Mount, len(inputMounts)) for idx, im := range inputMounts { mnt := &runtimeapi.Mount{ ContainerPath: im.MountPath, Readonly: im.ReadOnly, SelinuxRelabel: im.SelinuxRelabel, Propagation: volume_mount.GetRuntimeVolumeMountPropagation(im.Propagation), } hostPath, err := volume_mount.GetDriver(im.Type).GetRuntimeMountHostPath(s, ctrId, im) if err != nil { return nil, errors.Wrapf(err, "get runtime host mount path of %s", jsonutils.Marshal(im)) } mnt.HostPath = hostPath mounts[idx] = mnt } return mounts, nil } func (s *sPodGuestInstance) getCGUtil() podutil.CgroupUtil { cgUtil, err := podutil.NewPodCgroupUtil(PodCgroupParent()) if err != nil { // 如果检测失败,记录错误并使用默认的 v1 实现 log.Errorf("failed to detect cgroup version, fallback to v1: %s", err) return podutil.NewPodCgroupV1Util(PodCgroupParent()) } return cgUtil } // updateContainerDevicesViaContainerd 通过 containerd API 更新 container spec 中的 devices // 用于 cgroup v2 场景 func (s *sPodGuestInstance) updateContainerDevicesViaContainerd(ctx context.Context, criId string, devices []*specs.LinuxDeviceCgroup) error { addr, namespace := GetContainerdConnectionInfo() // 创建 containerd client client, err := containerdutil.NewClient(ctx, addr, namespace) if err != nil { return errors.Wrap(err, "create containerd client") } defer client.Close() // 更新 container devices return containerdutil.UpdateContainerDevices(ctx, client, criId, devices) } func (s *sPodGuestInstance) setContainerCgroupDevicesAllow(ctrId string, allowStrs []string, devices []*hostapi.ContainerDevice) error { // 自动从容器设备配置中提取设备号并添加到允许列表 allowSet := make(map[string]bool) for _, rule := range allowStrs { allowSet[rule] = true } // 遍历容器设备,提取设备路径并生成设备规则 for _, dev := range devices { var devicePath string var permissions string = "rwm" // 默认权限 // 获取设备路径和权限 if dev.Host != nil && dev.Host.HostPath != "" { devicePath = dev.Host.HostPath if dev.Permissions != "" { permissions = dev.Permissions } } else if dev.IsolatedDevice != nil { // 对于 IsolatedDevice,优先使用 RenderPath,然后是 Path if dev.IsolatedDevice.RenderPath != "" { devicePath = dev.IsolatedDevice.RenderPath } else if dev.IsolatedDevice.Path != "" { devicePath = dev.IsolatedDevice.Path } else if dev.IsolatedDevice.CardPath != "" { devicePath = dev.IsolatedDevice.CardPath } if dev.Permissions != "" { permissions = dev.Permissions } } else { log.Debugf("[setContainerCgroupDevicesAllow] Skipping device %v (no Host or IsolatedDevice path)", dev) continue } if devicePath == "" { log.Debugf("[setContainerCgroupDevicesAllow] Skipping device %v (no device path found)", dev) continue } // 从设备路径获取设备号并生成设备规则 rule, err := podutil.GetDeviceAllowRuleFromPath(devicePath, permissions) if err != nil { log.Warningf("[setContainerCgroupDevicesAllow] Failed to get device rule from path %s: %v (skipping)", devicePath, err) continue } // 添加到允许列表(去重) if !allowSet[rule] { allowStrs = append(allowStrs, rule) allowSet[rule] = true log.Infof("[setContainerCgroupDevicesAllow] Auto-added device rule: %s (from device path: %s)", rule, devicePath) } } return s.getCGUtil().SetDevicesAllow(ctrId, allowStrs) } func (s *sPodGuestInstance) SetContainerResourceLimit(ctrId string, limit *apis.ContainerResources) (jsonutils.JSONObject, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } return nil, s.setContainerResourcesLimit(criId, limit) } func (s *sPodGuestInstance) setContainerResourcesLimit(ctrId string, limit *apis.ContainerResources) error { cgUtil := s.getCGUtil() /*if limit.MemoryLimitMB != nil { if err := cgUtil.SetMemoryLimitBytes(ctrId, *limit.MemoryLimitMB*1024*1024); err != nil { return errors.Wrapf(err, "set memory limit to %d MB", *limit.MemoryLimitMB) } }*/ if limit.CpuCfsQuota != nil { cpuCfsQuotaUs := *limit.CpuCfsQuota * float64(s.getDefaultCPUPeriod()) if err := cgUtil.SetCPUCfs(ctrId, int64(cpuCfsQuotaUs), s.getDefaultCPUPeriod()); err != nil { return errors.Wrapf(err, "set cpu cfs quota to %d", int64(cpuCfsQuotaUs)) } } if limit.PidsMax != nil { if err := cgUtil.SetPidsMax(ctrId, *limit.PidsMax); err != nil { return errors.Wrapf(err, "set pids.max to %d", *limit.PidsMax) } } if len(limit.DevicesAllow) != 0 { if err := cgUtil.SetDevicesAllow(ctrId, limit.DevicesAllow); err != nil { return errors.Wrapf(err, "set devices.allow %v", limit.DevicesAllow) } } if limit.CpusetCloneChildren { if err := cgUtil.SetCpusetCloneChildren(ctrId); err != nil { return errors.Wrapf(err, "set cpuset clone_children") } } if limit.MemoryHighRatio != nil { memHigh := int64(*limit.MemoryHighRatio * float64(s.GetDesc().Mem*1024*1024)) if err := cgUtil.SetCgroupKeyValue(ctrId, podutil.CgroupControllerMemory, "memory.high", fmt.Sprintf("%d", memHigh)); err != nil { return errors.Wrapf(err, "set memory.high to %d", memHigh) } } return nil } func (s *sPodGuestInstance) getDefaultCPUPeriod() int64 { return 100000 } func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclient.TokenCredential, sandbox *runtimeapi.PodSandbox, ctrId string, input *hostapi.ContainerCreateInput) (string, error) { podCfg, err := s.getPodSandboxConfig() if err != nil { return "", errors.Wrap(err, "getPodSandboxConfig") } spec := input.Spec mounts, err := s.getContainerMounts(ctrId, input) if err != nil { return "", errors.Wrap(err, "get container mounts") } if spec.SimulateCpu { systemCpuMounts, err := s.simulateContainerSystemCpu(ctx, ctrId) if err != nil { return "", errors.Wrapf(err, "simulate container system cpu") } newMounts := systemCpuMounts newMounts = append(newMounts, mounts...) mounts = newMounts } // process shm size if spec.ShmSizeMB > 64 { // mount empty dir shmPath, err := s.mountDevShm(input, spec.ShmSizeMB) if err != nil { return "", errors.Wrapf(err, "mount dev shm") } mounts = append(mounts, &runtimeapi.Mount{ ContainerPath: "/dev/shm", HostPath: shmPath, }) } // inject /etc/hosts to hide host storage if spec.Rootfs != nil { if etcFilesMount, err := s.getEtcFilesMount(ctrId); err != nil { return "", errors.Wrapf(err, "get etc hosts mount") } else { mounts = append(mounts, etcFilesMount...) } } var cpuSetCpus string var cpuSetMems string var extraCpuCount int { cpuSets := sets.NewString() cpuMemSets := sets.NewString() if len(s.Desc.CpuNumaPin) > 0 { for _, cpuNumaPin := range s.GetDesc().CpuNumaPin { for _, cpuPin := range cpuNumaPin.VcpuPin { cpuSets.Insert(fmt.Sprintf("%d", cpuPin.Pcpu)) } if cpuNumaPin.NodeId != nil && cpuNumaPin.SizeMB > 0 { cpuMemSets.Insert(fmt.Sprintf("%d", int(*cpuNumaPin.NodeId))) } if cpuNumaPin.ExtraCpuCount > 0 { extraCpuCount += cpuNumaPin.ExtraCpuCount } } cpuSetCpus = strings.Join(cpuSets.List(), ",") cpuSetMems = strings.Join(cpuMemSets.List(), ",") } else if len(s.Desc.VcpuPin) > 0 { for _, vcpuPin := range s.Desc.VcpuPin { cpuSets.Insert(vcpuPin.Pcpus) } cpuSetCpus = strings.Join(cpuSets.List(), ",") } } procMountType := apis.ContainerDefaultProcMount if spec.SecurityContext != nil && spec.SecurityContext.ProcMount == apis.ContainerUnmaskedProcMount { procMountType = apis.ContainerUnmaskedProcMount } ctrCfg := &runtimeapi.ContainerConfig{ Metadata: &runtimeapi.ContainerMetadata{ Name: input.Name, Attempt: uint32(input.RestartCount), }, Labels: map[string]string{ runtime.PodNameLabel: s.GetDesc().Name, runtime.PodUIDLabel: s.GetId(), runtime.ContainerNameLabel: input.Name, runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount), }, Annotations: map[string]string{ runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount), }, Image: &runtimeapi.ImageSpec{ Image: spec.Image, }, Linux: &runtimeapi.LinuxContainerConfig{ Resources: &runtimeapi.LinuxContainerResources{ // REF: https://docs.docker.com/config/containers/resource_constraints/#configure-the-default-cfs-scheduler CpuPeriod: s.getDefaultCPUPeriod(), CpuQuota: (s.GetDesc().Cpu + int64(extraCpuCount)) * s.getDefaultCPUPeriod(), //CpuShares: defaultCPUPeriod, MemoryLimitInBytes: s.GetDesc().Mem * 1024 * 1024, OomScoreAdj: 0, CpusetCpus: cpuSetCpus, CpusetMems: cpuSetMems, HugepageLimits: nil, Unified: nil, MemorySwapLimitInBytes: 0, }, SecurityContext: &runtimeapi.LinuxContainerSecurityContext{ Capabilities: &runtimeapi.Capability{}, Privileged: spec.Privileged, NamespaceOptions: podCfg.Linux.SecurityContext.GetNamespaceOptions(), SelinuxOptions: nil, RunAsUser: nil, RunAsGroup: nil, RunAsUsername: "", ReadonlyRootfs: false, SupplementalGroups: nil, NoNewPrivs: !spec.DisableNoNewPrivs, MaskedPaths: podutil.GetDefaultMaskedPaths(procMountType), ReadonlyPaths: podutil.GetReadonlyPaths(procMountType), Seccomp: &runtimeapi.SecurityProfile{ ProfileType: runtimeapi.SecurityProfile_Unconfined, }, Apparmor: &runtimeapi.SecurityProfile{ ProfileType: runtimeapi.SecurityProfile_Unconfined, }, ApparmorProfile: "", SeccompProfilePath: "", }, }, LogPath: s.getContainerLogPath(ctrId), Envs: make([]*runtimeapi.KeyValue, 0), Devices: []*runtimeapi.Device{}, Mounts: mounts, } // set container namespace options to target /*if ctrCfg.Linux.SecurityContext.NamespaceOptions.Pid == runtimeapi.NamespaceMode_CONTAINER { ctrCfg.Linux.SecurityContext.NamespaceOptions.Pid = runtimeapi.NamespaceMode_TARGET ctrCfg.Linux.SecurityContext.NamespaceOptions.TargetId = s.GetCRIId() }*/ if spec.Rootfs != nil { ctrCfg.Labels[snapshot_service.LabelServerId] = s.GetId() ctrCfg.Labels[snapshot_service.LabelContainerId] = ctrId } // inherit security context if spec.SecurityContext != nil { secInput := spec.SecurityContext if secInput.RunAsUser != nil { ctrCfg.Linux.SecurityContext.RunAsUser = &runtimeapi.Int64Value{ Value: *secInput.RunAsUser, } } if secInput.RunAsGroup != nil { ctrCfg.Linux.SecurityContext.RunAsGroup = &runtimeapi.Int64Value{ Value: *secInput.RunAsGroup, } } if secInput.ApparmorProfile != "" { ctrCfg.Linux.SecurityContext.Apparmor = &runtimeapi.SecurityProfile{ ProfileType: runtimeapi.SecurityProfile_Localhost, LocalhostRef: secInput.ApparmorProfile, } } } if spec.EnableLxcfs { ctrCfg.Mounts = append(ctrCfg.Mounts, s.getLxcfsMounts()...) } if spec.Capabilities != nil { ctrCfg.Linux.SecurityContext.Capabilities.AddCapabilities = spec.Capabilities.Add ctrCfg.Linux.SecurityContext.Capabilities.DropCapabilities = spec.Capabilities.Drop } envSecs := make(map[string]map[string]string, 0) if len(spec.SecretCredentials) > 0 { for credId, credData := range spec.SecretCredentials { obj := map[string]string{} credKey, err := base64.StdEncoding.DecodeString(credData) if err != nil { return "", errors.Wrapf(err, "decode secret credential %s", credId) } if err := json.Unmarshal(credKey, &obj); err != nil { return "", errors.Wrapf(err, "unmarshal secret credential %s", credId) } envSecs[credId] = obj } for i := range spec.Envs { env := spec.Envs[i] if env.ValueFrom == nil { continue } if env.ValueFrom.Credential != nil { credId := env.ValueFrom.Credential.Id if _, ok := envSecs[credId]; !ok { return "", errors.Wrapf(errors.ErrNotFound, "secret credential %s not found", credId) } credKey := env.ValueFrom.Credential.Key if _, ok := envSecs[credId][credKey]; !ok { return "", errors.Wrapf(errors.ErrNotFound, "secret credential %s key %s not found", credId, credKey) } env.Value = envSecs[credId][credKey] } } } for _, env := range spec.Envs { ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{ Key: env.Key, Value: env.Value, }) } pms, err := s.GetPortMappings() if err != nil { return "", errors.Wrapf(err, "get pod port mappings") } if len(pms) != 0 { for _, pm := range pms { envKey := fmt.Sprintf("CLOUDPODS_%s_PORT_%d", strings.ToUpper(string(pm.Protocol)), pm.Port) envVal := fmt.Sprintf("%d", *pm.HostPort) ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{ Key: envKey, Value: envVal, }) for _, pEnv := range pm.Envs { pEnvVal := "" switch pEnv.ValueFrom { case computeapi.GuestPortMappingEnvValueFromHostPort: pEnvVal = fmt.Sprintf("%d", *pm.HostPort) case computeapi.GuestPortMappingEnvValueFromPort: pEnvVal = fmt.Sprintf("%d", pm.Port) default: return "", httperrors.NewInputParameterError("invalid value from %s", pEnv.ValueFrom) } ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{ Key: pEnv.Key, Value: pEnvVal, }) } } } if s.GetDesc().HostAccessIp != "" { ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{ Key: "CLOUDPODS_HOST_ACCESS_IP", Value: s.GetDesc().HostAccessIp, }) } if s.GetDesc().HostEIP != "" { ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{ Key: "CLOUDPODS_HOST_EIP", Value: s.GetDesc().HostEIP, }) } if len(spec.Devices) != 0 { devsByType := map[apis.ContainerDeviceType][]*hostapi.ContainerDevice{} for i := range spec.Devices { if devs, ok := devsByType[spec.Devices[i].Type]; ok { devsByType[spec.Devices[i].Type] = append(devs, spec.Devices[i]) } else { devsByType[spec.Devices[i].Type] = []*hostapi.ContainerDevice{spec.Devices[i]} } } for cdType, devs := range devsByType { ctrDevs, err := device.GetDriver(cdType).GetRuntimeDevices(input, devs) if err != nil { return "", errors.Wrapf(err, "GetRuntimeDevices of %s", jsonutils.Marshal(devs)) } ctrCfg.Devices = append(ctrCfg.Devices, ctrDevs...) } if err := s.getIsolatedDeviceExtraConfig(spec, ctrCfg); err != nil { return "", err } } else { if hostinfo.Instance().HasContainerNvidiaGpu() { ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{Key: "NVIDIA_VISIBLE_DEVICES", Value: "void"}) } } if len(spec.Command) != 0 { ctrCfg.Command = spec.Command } if len(spec.Args) != 0 { ctrCfg.Args = spec.Args } criId, err := s.getCRI().CreateContainer(ctx, sandbox.GetId(), podCfg, ctrCfg, false) if err != nil { return "", errors.Wrapf(err, "cri.CreateContainer of pod: %q", sandbox.GetId()) } if err := s.saveContainer(ctrId, criId); err != nil { return "", errors.Wrap(err, "saveContainer") } return criId, nil } // copyEtcFile 复制主机上的 etc 文件到容器根文件系统 func (s *sPodGuestInstance) copyEtcFile(hostPath, etcFilePath string) (*runtimeapi.Mount, error) { hostEtcFilePath := filepath.Join(hostPath, etcFilePath) // 确保目录存在 if err := volume_mount.EnsureDir(filepath.Dir(hostEtcFilePath)); err != nil { return nil, errors.Wrapf(err, "ensure dir %s", filepath.Dir(hostEtcFilePath)) } // 复制文件 if err := volume_mount.CopyFile(etcFilePath, hostEtcFilePath); err != nil { return nil, errors.Wrapf(err, "copy file %s to %s", etcFilePath, hostEtcFilePath) } // 创建挂载点 return &runtimeapi.Mount{ ContainerPath: etcFilePath, HostPath: hostEtcFilePath, }, nil } // generateEtcFile 生成 etc 文件内容到容器根文件系统 func (s *sPodGuestInstance) generateEtcFile(hostPath, etcFilePath, content string) (*runtimeapi.Mount, error) { hostEtcFilePath := filepath.Join(hostPath, etcFilePath) // 确保目录存在 if err := volume_mount.EnsureDir(filepath.Dir(hostEtcFilePath)); err != nil { return nil, errors.Wrapf(err, "ensure dir %s", filepath.Dir(hostEtcFilePath)) } // 生成文件内容 if err := fileutils2.FilePutContents(hostEtcFilePath, content, false); err != nil { return nil, errors.Wrapf(err, "put file %s to %s", etcFilePath, hostEtcFilePath) } // 创建挂载点 return &runtimeapi.Mount{ ContainerPath: etcFilePath, HostPath: hostEtcFilePath, }, nil } func (s *sPodGuestInstance) getEtcFilesMount(ctrId string) ([]*runtimeapi.Mount, error) { hostPath, err := s.GetRootFsMountPath(ctrId) if err != nil { return nil, errors.Wrapf(err, "get container root fs path of %s", ctrId) } // 复制 /etc/hosts 文件 etcHostsMount, err := s.copyEtcFile(hostPath, "/etc/hosts") if err != nil { return nil, errors.Wrap(err, "copy /etc/hosts") } // 生成 /etc/hostname 文件 etcHostnameMount, err := s.generateEtcFile(hostPath, "/etc/hostname", s.GetDesc().Hostname) if err != nil { return nil, errors.Wrap(err, "generate /etc/hostname") } // 复制 /etc/resolv.conf 文件 etcResolvConfMount, err := s.copyEtcFile(hostPath, "/etc/resolv.conf") if err != nil { return nil, errors.Wrap(err, "copy /etc/resolv.conf") } return []*runtimeapi.Mount{etcHostsMount, etcHostnameMount, etcResolvConfMount}, nil } type FilteredContainerDevices struct { EnvDevs []*hostapi.ContainerDevice CDIDevs []*hostapi.ContainerDevice RestDevs []*hostapi.ContainerDevice } func filterContainerIsolatedDevices(devs []*hostapi.ContainerDevice, devTypes sets.String) FilteredContainerDevices { envDevs := []*hostapi.ContainerDevice{} restDevs := []*hostapi.ContainerDevice{} cdiDevs := []*hostapi.ContainerDevice{} for i := range devs { dev := devs[i] if dev.IsolatedDevice != nil { devType := dev.IsolatedDevice.DeviceType if !devTypes.Has(devType) { continue } if dev.IsolatedDevice.IsCDIUsed() { cdiDevs = append(cdiDevs, dev) } else if len(dev.IsolatedDevice.OnlyEnv) > 0 { envDevs = append(envDevs, dev) } else { restDevs = append(restDevs, dev) } } } return FilteredContainerDevices{ EnvDevs: envDevs, CDIDevs: cdiDevs, RestDevs: restDevs, } } func getEnvsFromDevices(devs []*hostapi.ContainerDevice) []*runtimeapi.KeyValue { retEnvs := []*runtimeapi.KeyValue{} for _, dev := range devs { if dev.IsolatedDevice == nil { continue } if len(dev.IsolatedDevice.OnlyEnv) == 0 { continue } for _, oe := range dev.IsolatedDevice.OnlyEnv { var tmpEnv *runtimeapi.KeyValue if oe.FromRenderPath { tmpEnv = &runtimeapi.KeyValue{ Key: oe.Key, Value: dev.IsolatedDevice.RenderPath, } } if oe.FromIndex { tmpEnv = &runtimeapi.KeyValue{ Key: oe.Key, Value: fmt.Sprintf("%d", dev.IsolatedDevice.Index), } } if oe.FromDeviceMinor { tmpEnv = &runtimeapi.KeyValue{ Key: oe.Key, Value: fmt.Sprintf("%d", dev.IsolatedDevice.DeviceMinor), } } if tmpEnv != nil { retEnvs = append(retEnvs, tmpEnv) } } } return retEnvs } func (s *sPodGuestInstance) getIsolatedDeviceExtraConfig(spec *hostapi.ContainerSpec, ctrCfg *runtimeapi.ContainerConfig) error { devTypes := sets.NewString( string(isolated_device.ContainerDeviceTypeNvidiaGpu), string(isolated_device.ContainerDeviceTypeNvidiaMps), string(isolated_device.ContainerDeviceTypeNvidiaGpuShare), string(isolated_device.ContainerDeviceTypeAscendNpu), ) fDevs := filterContainerIsolatedDevices(spec.Devices, devTypes) if len(fDevs.EnvDevs) != 0 { ctrCfg.Envs = append(ctrCfg.Envs, getEnvsFromDevices(fDevs.EnvDevs)...) } for i := range fDevs.RestDevs { dev := fDevs.RestDevs[i] devMan, err := isolated_device.GetContainerDeviceManager(isolated_device.ContainerDeviceType(dev.IsolatedDevice.DeviceType)) if err != nil { return errors.Wrapf(err, "GetContainerDeviceManager by type %q", dev.IsolatedDevice.DeviceType) } envs, mounts := devMan.GetContainerExtraConfigures([]*hostapi.ContainerDevice{dev}) if len(envs) > 0 { ctrCfg.Envs = append(ctrCfg.Envs, envs...) } if len(mounts) > 0 { ctrCfg.Mounts = append(ctrCfg.Mounts, mounts...) } } if len(fDevs.CDIDevs) > 0 { cdiDevs, err := isolated_device.GetContainerCDIDevices(fDevs.CDIDevs) if err != nil { return errors.Wrap(err, "GetContainerCDIDevices") } ctrCfg.CDIDevices = append(ctrCfg.CDIDevices, cdiDevs...) } return nil } func (s *sPodGuestInstance) getContainerSystemCpusDir(ctrId string) string { rootFsPath, _ := s.GetRootFsMountPath(ctrId) if rootFsPath != "" { return filepath.Join(rootFsPath, "cpus", ctrId) } return filepath.Join(s.HomeDir(), "cpus", ctrId) } func (s *sPodGuestInstance) ensureContainerSystemCpuDir(cpuDir string, cpuCnt int64) error { // create cpu dir like /var/lib/docker/cpus/$ctr_name if err := podutil.EnsureContainerSystemCpuDir(cpuDir, cpuCnt); err != nil { return errors.Wrap(err, "ensure container system cpu dir") } return nil } func (s *sPodGuestInstance) findHostCpuPath(ctrId string, cpuIndex int) (int, error) { return s.getHostCPUMap().Get(ctrId, cpuIndex) } func (s *sPodGuestInstance) simulateContainerSystemCpuSetScalingCurFreq(ctrId string, scalingCurFreq int64) error { cpuDir := s.getContainerSystemCpusDir(ctrId) cpuCnt := s.GetDesc().Cpu for i := 0; i < int(cpuCnt); i++ { cpufreqPolicyCurFreqFile := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", i), "scaling_cur_freq") cpufreqPolicySetSpeedFile := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", i), "scaling_setspeed") scalingCurFreqStr := fmt.Sprintf("%d\n", scalingCurFreq) if err := fileutils2.FilePutContents(cpufreqPolicyCurFreqFile, scalingCurFreqStr, false); err != nil { return errors.Wrapf(err, "failed write %s", cpufreqPolicyCurFreqFile) } if err := fileutils2.FilePutContents(cpufreqPolicySetSpeedFile, scalingCurFreqStr, false); err != nil { return errors.Wrapf(err, "failed write %s", cpufreqPolicySetSpeedFile) } } return nil } func (s *sPodGuestInstance) simulateContainerSystemCpu(ctx context.Context, ctrId string) ([]*runtimeapi.Mount, error) { cpuDir := s.getContainerSystemCpusDir(ctrId) cpuCnt := s.GetDesc().Cpu if err := s.ensureContainerSystemCpuDir(cpuDir, cpuCnt); err != nil { return nil, err } cpufreqConfig := s.manager.host.GetContainerCpufreqSimulateConfig() if cpufreqConfig != nil { cpufreqDir := path.Join(cpuDir, "cpufreq") out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", cpufreqDir).Output() if err != nil { return nil, errors.Wrapf(err, "mkdir %s: %s", cpufreqDir, out) } } sysCpuPath := "/sys/devices/system/cpu" ret := []*runtimeapi.Mount{ { ContainerPath: sysCpuPath, HostPath: cpuDir, }, } for i := 0; i < int(cpuCnt); i++ { hostCpuIdx, err := s.findHostCpuPath(ctrId, i) if err != nil { return nil, errors.Wrapf(err, "find host cpu by container %s with index %d", ctrId, i) } hostCpuPath := filepath.Join(sysCpuPath, fmt.Sprintf("cpu%d", hostCpuIdx)) if cpufreqConfig != nil { if err := s.ensureContainerSystemCpufreqHostDir(cpuDir, hostCpuPath, i, cpufreqConfig); err != nil { return nil, errors.Wrap(err, "ensureContainerSystemCpufreqHostDir") } } else { ret = append(ret, &runtimeapi.Mount{ ContainerPath: filepath.Join(sysCpuPath, fmt.Sprintf("cpu%d", i)), HostPath: hostCpuPath, }) } } pathMap := func(baseName string) *runtimeapi.Mount { p := filepath.Join(sysCpuPath, baseName) return &runtimeapi.Mount{ ContainerPath: p, HostPath: p, Readonly: true, } } cpuConfigs := []string{"modalias", "power", "cpuidle", "hotplug", "isolated", "uevent"} if cpufreqConfig == nil { cpuConfigs = append(cpuConfigs, "cpufreq") } for _, baseName := range cpuConfigs { ret = append(ret, pathMap(baseName)) } return ret, nil } func (s *sPodGuestInstance) ensureContainerSystemCpufreqHostDir(cpuDir, hostCpuPath string, cpuIdx int, cpufreqConfig *jsonutils.JSONDict) error { cpufreqPolicyDir := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", cpuIdx)) out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", cpufreqPolicyDir).Output() if err != nil { return errors.Wrapf(err, "mkdir %s: %s", cpufreqPolicyDir, out) } cpuiDir := path.Join(cpuDir, fmt.Sprintf("cpu%d", cpuIdx)) out, err = procutils.NewRemoteCommandAsFarAsPossible("cp", "-rf", hostCpuPath, cpuiDir).Output() if err != nil { log.Warningf("cp %s to %s: %s %s", hostCpuPath, cpuiDir, out, err) } cpufreqDir := path.Join(cpuiDir, "cpufreq") out, err = procutils.NewRemoteCommandAsFarAsPossible("rm", "-f", cpufreqDir).Output() if err != nil { return errors.Wrapf(err, "rm -f %s: %s", cpufreqDir, out) } for _, fname := range []string{ "affected_cpus", "cpuinfo_max_freq", "cpuinfo_min_freq", "cpuinfo_transition_latency", "related_cpus", "scaling_available_governors", "scaling_cur_freq", "scaling_driver", "scaling_governor", "scaling_max_freq", "scaling_min_freq", "scaling_setspeed", } { switch fname { case "affected_cpus", "related_cpus": val := strconv.Itoa(cpuIdx) cpath := path.Join(cpufreqPolicyDir, fname) if err := fileutils2.FilePutContents(cpath, val+"\n", false); err != nil { return errors.Wrapf(err, "failed write %s", cpath) } default: val, err := cpufreqConfig.GetString(fname) if err != nil { log.Warningf("simulate cpufreq no %s", fname) continue } cpath := path.Join(cpufreqPolicyDir, fname) if err := fileutils2.FilePutContents(cpath, val+"\n", false); err != nil { return errors.Wrapf(err, "failed write %s", cpath) } } } out, err = procutils.NewRemoteCommandAsFarAsPossible("ln", "-s", fmt.Sprintf("../cpufreq/policy%d", cpuIdx), cpufreqDir).Output() if err != nil { return errors.Wrapf(err, "ln -s ../cpufreq/policy%d %s: %s", cpuIdx, cpufreqDir, out) } return nil } func (s *sPodGuestInstance) DeleteContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil && errors.Cause(err) != errors.ErrNotFound { return nil, errors.Wrap(err, "getContainerCRIId") } if criId != "" { s.expectedStatus.RemoveContainer(criId) if err := s.getCRI().RemoveContainer(ctx, criId); err != nil && !IsContainerNotFoundError(err) { return nil, errors.Wrap(err, "cri.RemoveContainer") } } // refresh local containers file delete(s.containers, ctrId) if err := s.saveContainersFile(s.containers); err != nil { return nil, errors.Wrap(err, "saveContainersFile") } if err := s.getHostCPUMap().Delete(ctrId); err != nil { log.Warningf("delete container %s cpu map: %v", ctrId, err) } return nil, nil } func (s *sPodGuestInstance) GetContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) { return s.getContainerStatus(ctx, ctrId) } func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil { if errors.Cause(err) == errors.ErrNotFound { // not found, already stopped return computeapi.CONTAINER_STATUS_EXITED, nil, nil } return "", nil, errors.Wrapf(err, "get container cri_id by %s", ctrId) } resp, err := s.getCRI().ContainerStatus(ctx, criId) if err != nil { if IsContainerNotFoundError(err) { return computeapi.CONTAINER_STATUS_EXITED, nil, nil } return "", nil, errors.Wrap(err, "cri.ContainerStatus") } cs := runtime.ToContainerStatus(resp.Status, "containerd") status := computeapi.CONTAINER_STATUS_UNKNOWN switch resp.Status.State { case runtimeapi.ContainerState_CONTAINER_CREATED: status = computeapi.CONTAINER_STATUS_CREATED case runtimeapi.ContainerState_CONTAINER_RUNNING: status = computeapi.CONTAINER_STATUS_RUNNING case runtimeapi.ContainerState_CONTAINER_EXITED: status = computeapi.CONTAINER_STATUS_EXITED case runtimeapi.ContainerState_CONTAINER_UNKNOWN: status = computeapi.CONTAINER_STATUS_UNKNOWN } if status == computeapi.CONTAINER_STATUS_RUNNING { ctr := s.GetContainerById(ctrId) if ctr == nil { return "", cs, errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId) } if ctr.Spec.NeedProbe() { status = computeapi.CONTAINER_STATUS_PROBING } } if status == computeapi.CONTAINER_STATUS_EXITED && resp.Status.ExitCode != 0 { if _, isInternalStopped := s.IsInternalStopped(criId); !isInternalStopped { status = computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF } } return status, cs, nil } func (s *sPodGuestInstance) MarkContainerProbeDirty(ctrStatus string, ctrId string, reason string) { s.markContainerProbeDirty(ctrStatus, ctrId, reason) } func (s *sPodGuestInstance) markContainerProbeDirty(status, ctrId string, reason string) { if status == computeapi.CONTAINER_STATUS_PROBING { reason = fmt.Sprintf("status is probing: %s", reason) s.getProbeManager().SetDirtyContainer(ctrId, reason) s.getProbeManager().AddPod(s) } } func (s *sPodGuestInstance) SyncContainerStatus(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) { status, cs, err := s.getContainerStatus(ctx, ctrId) if err != nil { return nil, errors.Wrap(err, "get container status") } s.markContainerProbeDirty(status, ctrId, "after syncing status") resp := computeapi.ContainerSyncStatusResponse{ Status: status, } if cs != nil { resp.StartedAt = cs.StartedAt resp.RestartCount = cs.RestartCount } return jsonutils.Marshal(resp), nil } func (s *sPodGuestInstance) PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) { policy := input.PullPolicy if policy == apis.ImagePullPolicyIfNotPresent || policy == "" { // check if image is presented img, err := s.getCRI().ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ Image: &runtimeapi.ImageSpec{ Image: input.Image, }, }) if err != nil { return nil, errors.Wrapf(err, "cri.ImageStatus %s", input.Image) } if img.Image != nil { log.Infof("image %s already exists, skipping pulling it when policy is %s", input.Image, policy) return jsonutils.Marshal(&runtimeapi.PullImageResponse{ ImageRef: img.Image.Id, }), nil } } return s.pullImageByCtrCmd(ctx, userCred, ctrId, input) // return s.pullImageByCRI(ctx, userCred, ctrId, input) } func (s *sPodGuestInstance) pullImageByCtrCmd(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) { if err := PullContainerdImage(input); err != nil { return nil, errors.Wrap(err, "PullContainerdImage with https and http") } return jsonutils.Marshal(&runtimeapi.PullImageResponse{ ImageRef: input.Image, }), nil } func (s *sPodGuestInstance) pullImageByCRI(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) { /*podCfg, err := s.getPodSandboxConfig() if err != nil { return nil, errors.Wrap(err, "get pod sandbox config") }*/ req := &runtimeapi.PullImageRequest{ Image: &runtimeapi.ImageSpec{ Image: input.Image, }, // SandboxConfig: podCfg, } if input.Auth != nil { authCfg := &runtimeapi.AuthConfig{ Username: input.Auth.Username, Password: input.Auth.Password, Auth: input.Auth.Auth, ServerAddress: input.Auth.ServerAddress, IdentityToken: input.Auth.IdentityToken, RegistryToken: input.Auth.RegistryToken, } req.Auth = authCfg } resp, err := s.getCRI().PullImage(ctx, req) if err != nil { return nil, errors.Wrapf(err, "cri.PullImage %s", input.Image) } return jsonutils.Marshal(resp), nil } func (s *sPodGuestInstance) SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error) { vol := input.VolumeMount drv := volume_mount.GetDriver(vol.Type) if err := drv.Mount(s, ctrId, vol); err != nil { return nil, errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } defer func() { if err := drv.Unmount(s, ctrId, vol); err != nil { log.Warningf("unmount volume %s: %v", jsonutils.Marshal(vol), err) } }() hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol) if err != nil { return nil, errors.Wrapf(err, "get runtime host mount path of %s", jsonutils.Marshal(vol)) } // 1. tar hostPath to tgz imgPath, originalSizeBytes, err := s.tarGzDir(input, ctrId, hostPath) if err != nil { return nil, errors.Wrapf(err, "tar and zip directory %s", hostPath) } defer func() { out, err := procutils.NewRemoteCommandAsFarAsPossible("rm", "-f", imgPath).Output() if err != nil { log.Warningf("rm -f %s: %s", imgPath, out) } }() // 2. upload target tgz to glance if err := s.saveTarGzToGlance(ctx, input, imgPath, originalSizeBytes); err != nil { return nil, errors.Wrapf(err, "saveTarGzToGlance: %s", imgPath) } return nil, nil } // shellQuote wraps s in single quotes, escaping any embedded single quotes // so the result is safe to embed in a sh -c command string. func shellQuote(s string) string { return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'" } func (s *sPodGuestInstance) tarGzDir(input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string, hostPath string) (string, int64, error) { fp := fmt.Sprintf("volimg-%s-ctr-%s-%d.tar.gz", input.ImageId, ctrId, input.VolumeMountIndex) outputFp := filepath.Join(s.GetVolumesDir(), fp) dirPath := "." if len(input.VolumeMountDirs) != 0 { dirPath = "" for _, vd := range input.VolumeMountDirs { dirPath = fmt.Sprintf("%s %s", dirPath, shellQuote(vd)) } } // 计算总字节数,兼容多个目录/文件 var totalSize int64 if len(input.VolumeMountDirs) == 0 { sizeCmd := fmt.Sprintf("du -sb %s | awk '{print $1}'", shellQuote(hostPath)) out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", sizeCmd).Output() if err != nil { return "", 0, errors.Wrapf(err, "calculate total size: %s", out) } outStr := strings.TrimSpace(string(out)) totalSize, err = strconv.ParseInt(outStr, 10, 64) if err != nil { return "", 0, errors.Wrapf(err, "parse total size: %s", outStr) } } else { for _, d := range input.VolumeMountDirs { // 兼容目录或文件名有空格 sizeCmd := fmt.Sprintf("du -sb %s | awk '{print $1}'", shellQuote(d)) cmd := fmt.Sprintf("cd %s && %s", shellQuote(hostPath), sizeCmd) out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output() if err != nil { return "", 0, errors.Wrapf(err, "calculate total size for %s: %s", d, out) } outStr := strings.TrimSpace(string(out)) sz, err := strconv.ParseInt(outStr, 10, 64) if err != nil { return "", 0, errors.Wrapf(err, "parse total size for %s: %s", d, outStr) } totalSize += sz } } // 减去 excludePaths 的大小 for _, exclude := range input.ExcludePaths { // exclude 路径是相对于 hostPath 的 sizeCmd := fmt.Sprintf("du -sb %s 2>/dev/null | awk '{print $1}'", shellQuote(exclude)) cmd := fmt.Sprintf("cd %s && %s", shellQuote(hostPath), sizeCmd) out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output() if err != nil { // exclude 路径不存在时忽略错误,继续处理下一个 continue } outStr := strings.TrimSpace(string(out)) if outStr == "" { continue } excludeSize, err := strconv.ParseInt(outStr, 10, 64) if err != nil { // 解析失败时忽略,继续处理下一个 continue } totalSize -= excludeSize // 确保 totalSize 不为负数 if totalSize < 0 { totalSize = 0 } } baseCmd := "tar -czf" // 添加 exclude 选项 for _, exclude := range input.ExcludePaths { baseCmd = fmt.Sprintf("%s --exclude=%s", baseCmd, shellQuote(exclude)) } cmd := fmt.Sprintf("%s %s -C %s %s", baseCmd, shellQuote(outputFp), shellQuote(hostPath), dirPath) if input.VolumeMountPrefix != "" { cmd += fmt.Sprintf(" --transform 's,^,%s/,'", input.VolumeMountPrefix) } if out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output(); err != nil { return "", 0, errors.Wrapf(err, "%s: %s", cmd, out) } return outputFp, totalSize, nil } func (s *sPodGuestInstance) saveTarGzToGlance(ctx context.Context, input *hostapi.ContainerSaveVolumeMountToImageInput, imgPath string, originalSizeBytes int64) error { f, err := os.Open(imgPath) if err != nil { return err } defer f.Close() finfo, err := f.Stat() if err != nil { return err } size := finfo.Size() // 转换为 MB originalSizeMB := originalSizeBytes / (1024 * 1024) var params = jsonutils.NewDict() params.Set("image_id", jsonutils.NewString(input.ImageId)) params.Set("min_disk", jsonutils.NewInt(originalSizeMB)) if _, err := imagemod.Images.Upload(hostutils.GetImageSession(ctx), params, f, size); err != nil { return errors.Wrap(err, "upload image") } return err } func (s *sPodGuestInstance) ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error) { rCli := s.getCRI().GetRuntimeClient() criId, err := s.getContainerCRIId(ctrId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } stderr := true if input.Tty { stderr = false } req := &runtimeapi.ExecRequest{ ContainerId: criId, Cmd: input.Command, Tty: input.Tty, Stdin: true, Stdout: true, Stderr: stderr, } if input.SetIO { req.Stdin = input.Stdin req.Stdout = input.Stdout } resp, err := rCli.Exec(ctx, req) if err != nil { return nil, errors.Wrap(err, "exec") } return url.Parse(resp.Url) } func (s *sPodGuestInstance) mountDevShm(input *hostapi.ContainerCreateInput, mb int) (string, error) { shmPath := s.getContainerShmDir(input.Name) if !fileutils2.Exists(shmPath) { out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", shmPath).Output() if err != nil { return "", errors.Wrapf(err, "mkdir -p %s: %s", shmPath, out) } } if err := procutils.NewRemoteCommandAsFarAsPossible("mountpoint", shmPath).Run(); err == nil { log.Warningf("mountpoint %s is already mounted", shmPath) return "", nil } out, err := procutils.NewRemoteCommandAsFarAsPossible("mount", "-t", "tmpfs", "-o", fmt.Sprintf("size=%dM", mb), "tmpfs", shmPath).Output() if err != nil { return "", errors.Wrapf(err, "mount tmpfs %s: %s", shmPath, out) } return shmPath, nil } func (s *sPodGuestInstance) unmountDevShm(containerName string) error { shmPath := s.getContainerShmDir(containerName) if err := procutils.NewRemoteCommandAsFarAsPossible("mountpoint", shmPath).Run(); err != nil { return nil } out, err := procutils.NewRemoteCommandAsFarAsPossible("umount", shmPath).Output() if err != nil { return errors.Wrapf(err, "mount tmpfs %s: %s", shmPath, out) } return nil } func (s *sPodGuestInstance) DoSnapshot(ctx context.Context, params *SDiskSnapshot) (jsonutils.JSONObject, error) { input := params.BackupDiskConfig.BackupAsTar if input.ContainerId == "" { return nil, httperrors.NewMissingParameterError("missing backup_disk_config.backup_as_tar.container_id") } isCtrRunning, err := s.IsContainerRunning(ctx, input.ContainerId) if err != nil { return nil, errors.Wrapf(err, "check container %s running status", input.ContainerId) } if params.BackupDiskConfig == nil { return nil, httperrors.NewMissingParameterError("missing backup_disk_config") } if params.BackupDiskConfig.BackupAsTar == nil { return nil, httperrors.NewMissingParameterError("missing backup_disk_config.backup_as_tar") } vols := s.getContainerVolumeMountsByDiskId(input.ContainerId, params.Disk.GetId()) if len(vols) == 0 { return nil, httperrors.NewNotFoundError("not found container volume_mount by container_id %s and disk_id %s", input.ContainerId, params.Disk.GetId()) } tmpBackRootDir, err := storageman.EnsureBackupDir() if err != nil { return nil, errors.Wrap(err, "EnsureBackupDir") } defer storageman.CleanupDirOrFile(tmpBackRootDir) povTmpBackRootDir := []string{} for _, vol := range vols { drv := volume_mount.GetDriver(vol.Type) if err := drv.Mount(s, input.ContainerId, vol); err != nil { return nil, errors.Wrapf(err, "mount %s to %s", input.ContainerId, jsonutils.Marshal(vol)) } mntPath, err := drv.GetRuntimeMountHostPath(s, input.ContainerId, vol) if err != nil { return nil, errors.Wrapf(err, "GetRuntimeMountHostPath containerId: %s, vol: %s", input.ContainerId, jsonutils.Marshal(vol)) } isMntPathFile := false targetBindMntPath := tmpBackRootDir if vol.Disk.SubDirectory != "" { // mkdir tmpBackRootDir/subdirectory targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.SubDirectory) } if vol.Disk.StorageSizeFile != "" { targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.StorageSizeFile) isMntPathFile = true } if isMntPathFile { if out, err := procutils.NewRemoteCommandAsFarAsPossible("touch", targetBindMntPath).Output(); err != nil { return nil, errors.Wrapf(err, "touch %s: %s", targetBindMntPath, out) } } else { if err := volume_mount.EnsureDir(targetBindMntPath); err != nil { return nil, errors.Wrap(err, "ensure dir") } } // do bind mount if err := mountutils.MountBind(mntPath, targetBindMntPath); err != nil { return nil, errors.Wrapf(err, "bind mount %s to %s", mntPath, targetBindMntPath) } // process post overlay diskDrv := drv.(disk.IVolumeMountDisk) for _, pov := range vol.Disk.PostOverlay { // bind mount post overlay dirs to tmpBackRootDir upperDir, err := diskDrv.GetPostOverlayRootUpperDir(s, vol, input.ContainerId, pov) if err != nil { return nil, errors.Wrapf(err, "get post overlay root upper dir: %s", jsonutils.Marshal(pov)) } workDir, err := diskDrv.GetPostOverlayRootWorkDir(s, vol, input.ContainerId) if err != nil { return nil, errors.Wrapf(err, "get post overlay root upper dir: %s", jsonutils.Marshal(pov)) } hostDiskRootPath, _ := diskDrv.GetHostDiskRootPath(s, vol) for _, srcDir := range []string{upperDir, workDir} { targetSubDir := strings.TrimPrefix(srcDir, hostDiskRootPath) targetPovBindMntPath := filepath.Join(tmpBackRootDir, targetSubDir) if err := mountutils.MountBind(srcDir, targetPovBindMntPath); err != nil { return nil, errors.Wrap(err, "bind mount post overlay dir") } povTmpBackRootDir = append(povTmpBackRootDir, targetPovBindMntPath) } } } deferUmount := func() error { for _, vol := range vols { // unbind mount for _, povPath := range povTmpBackRootDir { if err := mountutils.Unmount(povPath, false); err != nil { return errors.Wrapf(err, "umount bind point %s", povTmpBackRootDir) } } targetBindMntPath := filepath.Join(tmpBackRootDir, vol.Disk.SubDirectory) if vol.Disk.StorageSizeFile != "" { targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.StorageSizeFile) } if err := mountutils.Unmount(targetBindMntPath, false); err != nil { return errors.Wrapf(err, "umount bind point %s", targetBindMntPath) } } if !isCtrRunning && !s.IsRunning() { for _, vol := range vols { drv := volume_mount.GetDriver(vol.Type) if err := drv.Unmount(s, input.ContainerId, vol); err != nil { return errors.Wrapf(err, "unmount %s to %s", input.ContainerId, jsonutils.Marshal(vol)) } } } else { log.Infof("container %s/%s is running, so skipping unmount volumes", s.GetId(), input.ContainerId) } return nil } defer func() { if err := deferUmount(); err != nil { log.Errorf("deferUmount after snapshot error: %s", err) } else { log.Infof("defer umount success") } }() snapshotPath, err := s.createSnapshot(params, tmpBackRootDir) if err != nil { return nil, errors.Wrap(err, "create snapshot") } res := jsonutils.NewDict() res.Set("location", jsonutils.NewString(snapshotPath)) return res, nil } func (s *sPodGuestInstance) createSnapshot(params *SDiskSnapshot, hostPath string) (string, error) { d := params.Disk snapshotDir := d.GetSnapshotDir() log.Infof("snapshotDir of LocalDisk %s: %s", d.GetId(), snapshotDir) if !fileutils2.Exists(snapshotDir) { output, err := procutils.NewCommand("mkdir", "-p", snapshotDir).Output() if err != nil { log.Errorf("mkdir %s failed: %s", snapshotDir, output) return "", errors.Wrapf(err, "mkdir %s failed: %s", snapshotDir, output) } } snapshotPath := s.getSnapshotPath(d, params.SnapshotId) // tar hostPath to snapshotPath input := params.BackupDiskConfig.BackupAsTar if err := s.tarHostDir(hostPath, snapshotPath, input.IncludeFiles, input.ExcludeFiles, input.IgnoreNotExistFile, input.IncludePatterns); err != nil { return "", errors.Wrapf(err, "tar host dir %s to %s", hostPath, snapshotPath) } return snapshotPath, nil } func (s *sPodGuestInstance) tarHostDir(srcDir, targetPath string, includeFiles, excludeFiles []string, ignoreNotExistFile bool, includePatterns []string) error { baseCmd := "tar" filterNotExistFiles := func(files []string) []string { result := []string{} for i := range files { if fileutils2.Exists(filepath.Join(srcDir, files[i])) { result = append(result, files[i]) } else { log.Warningf("tar path doesn't exist: %q", filepath.Join(srcDir, files[i])) } } return result } if ignoreNotExistFile { includeFiles = filterNotExistFiles(includeFiles) excludeFiles = filterNotExistFiles(excludeFiles) } // 如果有 includePatterns,使用 find -name 找出匹配的路径,添加到 includeFiles 中 if len(includePatterns) > 0 { findPatterns := []string{} for _, pattern := range includePatterns { // 转义特殊字符,但保留 glob 通配符 findPatterns = append(findPatterns, fmt.Sprintf("-name %s", shellQuote(pattern))) } // 构建 find 命令来查找匹配的路径 findCmd := fmt.Sprintf("find . \\( %s \\)", strings.Join(findPatterns, " -o ")) cmd := fmt.Sprintf("cd %s && %s", shellQuote(srcDir), findCmd) log.Infof("[%s] find cmd: %s", s.GetName(), cmd) out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output() if err != nil { return errors.Wrapf(err, "find matching paths: %s", out) } // 解析 find 的输出,将匹配的路径添加到 includeFiles 中 // find 输出的路径格式为 ./path,需要去掉开头的 ./ lines := strings.Split(strings.TrimSpace(string(out)), "\n") for _, line := range lines { line = strings.TrimSpace(line) if line == "" { continue } // 去掉开头的 ./ if strings.HasPrefix(line, "./") { line = line[2:] } // 添加到 includeFiles 中 includeFiles = append(includeFiles, line) } } // 没有 includePatterns 时,使用原来的逻辑 // 添加 --exclude 选项 for _, exclude := range excludeFiles { baseCmd = fmt.Sprintf("%s --exclude=%s", baseCmd, shellQuote(exclude)) } includeStr := "." if len(includeFiles) > 0 { for i := range includeFiles { includeFiles[i] = shellQuote(includeFiles[i]) } includeStr = strings.Join(includeFiles, " ") } cmd := fmt.Sprintf("%s --ignore-failed-read -cf %s -C %s %s", baseCmd, shellQuote(targetPath), shellQuote(srcDir), includeStr) log.Infof("[%s] tar cmd: %s", s.GetName(), cmd) if out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output(); err != nil { outErr := errors.Wrapf(err, "%s: %s", cmd, out) outMsg := strings.ToLower(outErr.Error()) // ref: https://stackoverflow.com/questions/20318852/tar-file-changed-as-we-read-it const exitStatus1 = "exit status 1" const fileChangedMsg = "file changed as we read it" const fileRemovedMsg = "file removed before we read it" const socketIgnoredMsg = "socket ignored" if strings.Contains(outMsg, exitStatus1) { for _, warningMsg := range []string{fileChangedMsg, fileRemovedMsg, socketIgnoredMsg} { if strings.Contains(outMsg, warningMsg) { log.Warningf("[%s] got some warning message when tar: %s", s.GetName(), outMsg) return nil } } } return outErr } return nil } func (s *sPodGuestInstance) getSnapshotPath(d storageman.IDisk, snapshotId string) string { snapshotDir := d.GetSnapshotDir() snapshotPath := path.Join(snapshotDir, fmt.Sprintf("%s.tar", snapshotId)) return snapshotPath } func (s *sPodGuestInstance) DeleteSnapshot(ctx context.Context, params *SDeleteDiskSnapshot) (jsonutils.JSONObject, error) { snapshotPath := s.getSnapshotPath(params.Disk, params.DeleteSnapshot) out, err := procutils.NewCommand("rm", "-f", snapshotPath).Output() if err != nil { return nil, errors.Wrapf(err, "rm -f %s: %s", snapshotPath, out) } res := jsonutils.NewDict() res.Set("deleted", jsonutils.JSONTrue) return res, nil } func (s *sPodGuestInstance) doOnlineResizeDisk(ctx context.Context, disk storageman.IDisk, sizeMB int64) { drv, err := disk.GetContainerStorageDriver() if err != nil { hostutils.TaskFailed(ctx, fmt.Sprintf("get disk storage driver %s", err)) return } partDev, found, err := drv.CheckConnect(disk.GetPath()) if err != nil { hostutils.TaskFailed(ctx, fmt.Sprintf("disk check connect %s", err)) return } if !found { hostutils.TaskFailed(ctx, fmt.Sprintf("online resize but loop device not connected")) return } if err := disk.PreResize(ctx, sizeMB); err != nil { hostutils.TaskFailed(ctx, fmt.Sprintf("PreResize failed %s", err)) return } diskInfo := jsonutils.NewDict() diskInfo.Set("size", jsonutils.NewInt(sizeMB)) diskInfo.Set("loop_part_dev", jsonutils.NewString(partDev)) resizeDiskInfo := &storageman.SDiskResizeInput{ DiskInfo: diskInfo, } res, err := disk.Resize(ctx, resizeDiskInfo) if err != nil { hostutils.TaskFailed(ctx, fmt.Sprintf("PreResize failed %s", err)) return } hostutils.TaskComplete(ctx, res) } func (s *sPodGuestInstance) OnlineResizeDisk(ctx context.Context, disk storageman.IDisk, sizeMB int64) { go s.doOnlineResizeDisk(ctx, disk, sizeMB) } func (s *sPodGuestInstance) ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error) { ctrCriId, err := s.getContainerCRIId(ctrId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } cli := s.getCRI().GetRuntimeClient() resp, err := cli.ExecSync(ctx, &runtimeapi.ExecSyncRequest{ ContainerId: ctrCriId, Cmd: input.Command, Timeout: input.Timeout, }) if err != nil { return nil, errors.Wrapf(err, "exec sync %#v to %s", input.Command, ctrCriId) } return jsonutils.Marshal(&computeapi.ContainerExecSyncResponse{ Stdout: string(resp.Stdout), Stderr: string(resp.Stderr), ExitCode: resp.ExitCode, }), nil } func (s *sPodGuestInstance) ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error { // Do a zero-byte write to stdout before handing off to the container runtime. // This ensures at least one Write call is made to the writer when copying starts, // even if we then block waiting for log output from the container. if _, err := stdout.Write([]byte{}); err != nil { return err } ctrCriId, err := s.getContainerCRIId(ctrId) if err != nil { return errors.Wrapf(err, "get container cri id %s", ctrId) } resp, err := s.getCRI().ContainerStatus(ctx, ctrCriId) if err != nil { return errors.Wrapf(err, "get container status %s", ctrCriId) } logPath := resp.GetStatus().GetLogPath() opts := logs.NewLogOptions(input, time.Now()) return logs.ReadLogs(ctx, logPath, ctrCriId, opts, s.getCRI().GetRuntimeClient(), stdout, stderr) } func (s *sPodGuestInstance) CommitContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCommitInput) (jsonutils.JSONObject, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil { return nil, errors.Wrap(err, "get container cri id") } // 1. commit tool := NewContainerdNerdctl() imgRepo, err := tool.Commit(criId, &nerdctl.CommitOptions{Repository: input.Repository}) if err != nil { return nil, errors.Wrapf(err, "commit container %s image", ctrId) } log.Infof("container %s was commited to %s", ctrId, imgRepo) // 2. push to repository if err := PushContainerdImage(&hostapi.ContainerPushImageInput{ Image: imgRepo, Auth: input.Auth, }); err != nil { return nil, errors.Wrapf(err, "push container %s image", ctrId) } log.Infof("container %s was pushed to %s", ctrId, imgRepo) return jsonutils.Marshal(map[string]interface{}{ "image_repository": imgRepo, }), nil } func (s *sPodGuestInstance) AddContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountAddPostOverlayInput) error { isRunning, err := s.IsContainerRunning(ctx, ctrId) if err != nil { return errors.Wrap(err, "check container is running") } if !isRunning { return nil } ctrSpec := s.GetContainerById(ctrId) vol := ctrSpec.Spec.VolumeMounts[input.Index] drv := volume_mount.GetDriver(vol.Type) diskDrv, ok := drv.(disk.IVolumeMountDisk) if !ok { return errors.Errorf("invalid disk volume driver of %s", vol.Type) } return diskDrv.MountPostOverlays(s, ctrId, vol, input.PostOverlay) } func (s *sPodGuestInstance) RemoveContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountRemovePostOverlayInput) error { ctrSpec := s.GetContainerById(ctrId) vol := ctrSpec.Spec.VolumeMounts[input.Index] drv := volume_mount.GetDriver(vol.Type) diskDrv, ok := drv.(disk.IVolumeMountDisk) if !ok { return errors.Errorf("invalid disk volume driver of %s", vol.Type) } // drv.Mount 不会重复挂载,支持重复调用 if err := drv.Mount(s, ctrId, vol); err != nil { return errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId) } return diskDrv.UnmountPostOverlays(s, ctrId, vol, input.PostOverlay, input.UseLazy, input.ClearLayers) } func (s *sPodGuestInstance) SetNicDown(mac string) error { // null operation for pod, QIUJIAN, 20260203 return nil } func (s *sPodGuestInstance) SetNicUp(nic *desc.SGuestNetwork) error { // null operation for pod, QIUJIAN, 20260203 return nil }