| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package guestman
- import (
- "context"
- "fmt"
- "path"
- "strconv"
- "strings"
- "time"
- "github.com/shirou/gopsutil/v3/disk"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- hostapi "yunion.io/x/onecloud/pkg/apis/host"
- "yunion.io/x/onecloud/pkg/hostman/container/volume_mount"
- "yunion.io/x/onecloud/pkg/hostman/options"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/util/cgrouputils"
- "yunion.io/x/onecloud/pkg/util/fileutils2"
- "yunion.io/x/onecloud/pkg/util/pod/image"
- "yunion.io/x/onecloud/pkg/util/pod/nerdctl"
- )
- func IsContainerNotFoundError(err error) bool {
- if errors.Cause(err) == errors.ErrNotFound {
- return true
- }
- for _, errMsg := range []string{
- "NotFound",
- "not found",
- } {
- if strings.Contains(err.Error(), errMsg) {
- return true
- }
- }
- return false
- }
- func GetContainerdConnectionInfo() (string, string) {
- addr := options.HostOptions.ContainerRuntimeEndpoint
- addr = strings.TrimPrefix(addr, "unix://")
- namespace := "k8s.io"
- return addr, namespace
- }
- func NewContainerdImageTool() image.ImageTool {
- addr, namespace := GetContainerdConnectionInfo()
- return image.NewImageTool(addr, namespace)
- }
- func NewContainerdNerdctl() nerdctl.Nerdctl {
- addr, namespace := GetContainerdConnectionInfo()
- return nerdctl.NewNerdctl(addr, namespace)
- }
- func PullContainerdImage(input *hostapi.ContainerPullImageInput) error {
- opt := &image.PullOptions{
- RepoCommonOptions: image.RepoCommonOptions{
- SkipVerify: true,
- },
- }
- if input.Auth != nil {
- opt.Username = input.Auth.Username
- opt.Password = input.Auth.Password
- }
- imgTool := NewContainerdImageTool()
- errs := make([]error, 0)
- _, err := imgTool.Pull(input.Image, opt)
- if err != nil {
- // try http protocol
- errs = append(errs, errors.Errorf("pullImageByCtrCmd by https: %s", trimPullImageError(err.Error())))
- opt.PlainHttp = true
- log.Infof("try pull image %s by http", input.Image)
- if _, err := imgTool.Pull(input.Image, opt); err != nil {
- errs = append(errs, errors.Errorf("pullImageByCtrCmd by http: %s", trimPullImageError(err.Error())))
- return errors.NewAggregate(errs)
- }
- }
- return nil
- }
- func trimPullImageError(err string) string {
- lines := strings.Split(err, "\n")
- filterLines := []string{}
- for _, line := range lines {
- // 过滤掉 containerd 拉取镜像的输出
- if strings.Contains(line, "[0m") {
- continue
- }
- if strings.Contains(line, "elapsed:") && strings.Contains(line, "total:") {
- continue
- }
- filterLines = append(filterLines, line)
- }
- return strings.Join(filterLines, "\n")
- }
- func PushContainerdImage(input *hostapi.ContainerPushImageInput) error {
- opt := &image.PushOptions{
- RepoCommonOptions: image.RepoCommonOptions{
- SkipVerify: true,
- },
- }
- if input.Auth != nil {
- opt.Username = input.Auth.Username
- opt.Password = input.Auth.Password
- }
- imgTool := NewContainerdImageTool()
- err := imgTool.Push(input.Image, opt)
- errs := make([]error, 0)
- if err != nil {
- // try http protocol
- errs = append(errs, errors.Wrap(err, "pushImageByCtrCmd: %s"))
- opt.PlainHttp = true
- log.Infof("try push image %s by http", input.Image)
- if err := imgTool.Push(input.Image, opt); err != nil {
- errs = append(errs, errors.Wrapf(err, "pushImageByCtrCmd by http"))
- return errors.NewAggregate(errs)
- }
- }
- return nil
- }
- type ContainerVolumeKey struct {
- Id string
- HostPath string
- }
- func (s *sPodGuestInstance) GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error) {
- errs := []error{}
- result := make(map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage)
- for ctrId, vols := range s.getContainerVolumeMounts() {
- for i := range vols {
- vol := vols[i]
- drv, ok := volume_mount.GetDriver(vol.Type).(volume_mount.IUsageVolumeMount)
- if !ok {
- continue
- }
- vu, err := s.getVolumeMountUsage(drv, ctrId, vol)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "get container %s %s volume usage: %s", ctrId, drv.GetType(), jsonutils.Marshal(vol)))
- } else {
- result[ContainerVolumeKey{
- Id: ctrId,
- HostPath: vu.HostPath,
- }] = vu
- }
- }
- }
- return result, errors.NewAggregate(errs)
- }
- func (s *sPodGuestInstance) getVolumeMountUsage(drv volume_mount.IUsageVolumeMount, ctrId string, vol *hostapi.ContainerVolumeMount) (*volume_mount.ContainerVolumeMountUsage, error) {
- hp, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
- if err != nil {
- return nil, errors.Wrap(err, "GetRuntimeMountHostPath")
- }
- us, err := disk.Usage(hp)
- if err != nil {
- return nil, errors.Wrapf(err, "disk.Usage of %s", hp)
- }
- usage := &volume_mount.ContainerVolumeMountUsage{
- Id: ctrId,
- MountPath: vol.MountPath,
- HostPath: hp,
- VolumeType: string(drv.GetType()),
- Usage: us,
- Tags: make(map[string]string),
- }
- drv.InjectUsageTags(usage, vol)
- return usage, nil
- }
- func (s *sPodGuestInstance) RestartLocalPodAndContainers(ctx context.Context, cred mcclient.TokenCredential) {
- s.manager.GuestStartWorker.Run(newLocalPodRestartTask(ctx, cred, s), nil, nil)
- }
- type localPodRestartTask struct {
- ctx context.Context
- userCred mcclient.TokenCredential
- pod *sPodGuestInstance
- }
- func newLocalPodRestartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localPodRestartTask {
- return &localPodRestartTask{
- ctx: ctx,
- userCred: userCred,
- pod: pod,
- }
- }
- func (t *localPodRestartTask) Run() {
- log.Infof("restart pod and containers locally (%s/%s)", t.pod.Id, t.pod.GetName())
- for _, ctr := range t.pod.GetContainers() {
- log.Infof("stop container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
- if _, err := t.pod.StopContainer(t.ctx, t.userCred, ctr.Id, &hostapi.ContainerStopInput{
- Timeout: 0,
- ShmSizeMB: ctr.Spec.ShmSizeMB,
- ContainerName: ctr.Name,
- }); err != nil {
- log.Errorf("stop container %s error: %v", ctr.Name, err)
- }
- }
- if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
- log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
- return
- }
- for _, ctr := range t.pod.GetContainers() {
- log.Infof("start container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
- if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil {
- log.Errorf("start container %s err: %s", ctr.Id, err.Error())
- }
- }
- t.pod.SyncStatus("sync status after pod and containers restart locally", "")
- }
- func (t *localPodRestartTask) Dump() string {
- return fmt.Sprintf("pod restart task %s/%s", t.pod.GetId(), t.pod.GetName())
- }
- func GetPodStatusByContainerStatus(status string, cStatus string, isPrimary bool) string {
- if cStatus == computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF && isPrimary {
- status = computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF
- }
- if cStatus == computeapi.CONTAINER_STATUS_EXITED && status != computeapi.VM_READY {
- // status = computeapi.POD_STATUS_CONTAINER_EXITED
- if isPrimary {
- status = computeapi.VM_READY
- }
- }
- return status
- }
- type SCpuFreqRealTimeSimulateManager struct {
- lastTimeCpuUsage map[string]int64
- lastTime *int64
- interval int
- stop chan struct{}
- cpufreqMax int64
- cpufreqMin int64
- }
- func newCpuFreqRealTimeSimulateManager(intervalSecond int, cpufreqMax, cpufreqMin int64) *SCpuFreqRealTimeSimulateManager {
- return &SCpuFreqRealTimeSimulateManager{
- interval: intervalSecond,
- stop: make(chan struct{}),
- cpufreqMax: cpufreqMax,
- cpufreqMin: cpufreqMin,
- }
- }
- func (m *SCpuFreqRealTimeSimulateManager) StartSetCpuFreqSimulate() {
- log.Infof("StartSetCpuFreqSimulate")
- ticker := time.NewTicker(time.Duration(m.interval) * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- m.startSetCpuFreqSimulate()
- case <-m.stop:
- return
- }
- }
- }
- func (m *SCpuFreqRealTimeSimulateManager) Stop() {
- close(m.stop)
- }
- func (m *SCpuFreqRealTimeSimulateManager) startSetCpuFreqSimulate() {
- newCpuUsage := map[string]int64{}
- startTime := time.Now().UnixNano()
- guestManager.Servers.Range(func(k, v interface{}) bool {
- pod, ok := v.(*sPodGuestInstance)
- if !ok {
- log.Errorf("is not pod instance")
- return false
- }
- cgroupRoot := path.Join(cgrouputils.GetSubModulePath("cpuacct"), PodCgroupParent())
- criIds := pod.GetPodContainerCriIds()
- for i := range criIds {
- ctr, err := pod.GetContainerByCRIId(criIds[i])
- if err != nil {
- log.Errorf("failed get %s ctrid by criId %s", pod.GetName(), criIds[i])
- continue
- }
- if ctr == nil {
- log.Errorf("found nil container desc by criId %s in pod %s", criIds[i], pod.GetName())
- continue
- }
- cpuDir := pod.getContainerSystemCpusDir(ctr.Id)
- if !fileutils2.Exists(cpuDir) {
- log.Debugf("%s %s has no cpuDir", pod.GetName(), criIds[i])
- continue
- }
- usagePath := path.Join(cgroupRoot, criIds[i], "cpuacct.usage")
- criCpuUsageStr, err := fileutils2.FileGetContents(usagePath)
- if err != nil {
- continue
- }
- criCpuUsageStr = strings.TrimSpace(criCpuUsageStr)
- criCpuUsage, err := strconv.ParseInt(criCpuUsageStr, 10, 0)
- if err != nil {
- log.Errorf("failed parse %s %s: %s", usagePath, criCpuUsageStr, err)
- continue
- }
- newCpuUsage[criIds[i]] = criCpuUsage
- if m.lastTime != nil {
- if lastTimeUsage, ok := m.lastTimeCpuUsage[criIds[i]]; ok {
- timeDiff := float64(startTime - *m.lastTime)
- usageDiff := float64(criCpuUsage - lastTimeUsage)
- cpuUsagePercent := (usageDiff / timeDiff) * 100
- freqRange := float64(m.cpufreqMax - m.cpufreqMin)
- estimatedFreq := m.cpufreqMin + int64(freqRange*cpuUsagePercent/100)
- err := pod.simulateContainerSystemCpuSetScalingCurFreq(ctr.Id, estimatedFreq)
- if err != nil {
- log.Errorf("failed set %s(%s) simulate cpufreq: %s", pod.GetId(), criIds[i], err)
- }
- }
- }
- }
- return true
- })
- m.lastTime = &startTime
- m.lastTimeCpuUsage = newCpuUsage
- }
|