pod_helper.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guestman
  15. import (
  16. "context"
  17. "fmt"
  18. "path"
  19. "strconv"
  20. "strings"
  21. "time"
  22. "github.com/shirou/gopsutil/v3/disk"
  23. "yunion.io/x/jsonutils"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/errors"
  26. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  27. hostapi "yunion.io/x/onecloud/pkg/apis/host"
  28. "yunion.io/x/onecloud/pkg/hostman/container/volume_mount"
  29. "yunion.io/x/onecloud/pkg/hostman/options"
  30. "yunion.io/x/onecloud/pkg/mcclient"
  31. "yunion.io/x/onecloud/pkg/util/cgrouputils"
  32. "yunion.io/x/onecloud/pkg/util/fileutils2"
  33. "yunion.io/x/onecloud/pkg/util/pod/image"
  34. "yunion.io/x/onecloud/pkg/util/pod/nerdctl"
  35. )
  36. func IsContainerNotFoundError(err error) bool {
  37. if errors.Cause(err) == errors.ErrNotFound {
  38. return true
  39. }
  40. for _, errMsg := range []string{
  41. "NotFound",
  42. "not found",
  43. } {
  44. if strings.Contains(err.Error(), errMsg) {
  45. return true
  46. }
  47. }
  48. return false
  49. }
  50. func GetContainerdConnectionInfo() (string, string) {
  51. addr := options.HostOptions.ContainerRuntimeEndpoint
  52. addr = strings.TrimPrefix(addr, "unix://")
  53. namespace := "k8s.io"
  54. return addr, namespace
  55. }
  56. func NewContainerdImageTool() image.ImageTool {
  57. addr, namespace := GetContainerdConnectionInfo()
  58. return image.NewImageTool(addr, namespace)
  59. }
  60. func NewContainerdNerdctl() nerdctl.Nerdctl {
  61. addr, namespace := GetContainerdConnectionInfo()
  62. return nerdctl.NewNerdctl(addr, namespace)
  63. }
  64. func PullContainerdImage(input *hostapi.ContainerPullImageInput) error {
  65. opt := &image.PullOptions{
  66. RepoCommonOptions: image.RepoCommonOptions{
  67. SkipVerify: true,
  68. },
  69. }
  70. if input.Auth != nil {
  71. opt.Username = input.Auth.Username
  72. opt.Password = input.Auth.Password
  73. }
  74. imgTool := NewContainerdImageTool()
  75. errs := make([]error, 0)
  76. _, err := imgTool.Pull(input.Image, opt)
  77. if err != nil {
  78. // try http protocol
  79. errs = append(errs, errors.Errorf("pullImageByCtrCmd by https: %s", trimPullImageError(err.Error())))
  80. opt.PlainHttp = true
  81. log.Infof("try pull image %s by http", input.Image)
  82. if _, err := imgTool.Pull(input.Image, opt); err != nil {
  83. errs = append(errs, errors.Errorf("pullImageByCtrCmd by http: %s", trimPullImageError(err.Error())))
  84. return errors.NewAggregate(errs)
  85. }
  86. }
  87. return nil
  88. }
  89. func trimPullImageError(err string) string {
  90. lines := strings.Split(err, "\n")
  91. filterLines := []string{}
  92. for _, line := range lines {
  93. // 过滤掉 containerd 拉取镜像的输出
  94. if strings.Contains(line, "[0m") {
  95. continue
  96. }
  97. if strings.Contains(line, "elapsed:") && strings.Contains(line, "total:") {
  98. continue
  99. }
  100. filterLines = append(filterLines, line)
  101. }
  102. return strings.Join(filterLines, "\n")
  103. }
  104. func PushContainerdImage(input *hostapi.ContainerPushImageInput) error {
  105. opt := &image.PushOptions{
  106. RepoCommonOptions: image.RepoCommonOptions{
  107. SkipVerify: true,
  108. },
  109. }
  110. if input.Auth != nil {
  111. opt.Username = input.Auth.Username
  112. opt.Password = input.Auth.Password
  113. }
  114. imgTool := NewContainerdImageTool()
  115. err := imgTool.Push(input.Image, opt)
  116. errs := make([]error, 0)
  117. if err != nil {
  118. // try http protocol
  119. errs = append(errs, errors.Wrap(err, "pushImageByCtrCmd: %s"))
  120. opt.PlainHttp = true
  121. log.Infof("try push image %s by http", input.Image)
  122. if err := imgTool.Push(input.Image, opt); err != nil {
  123. errs = append(errs, errors.Wrapf(err, "pushImageByCtrCmd by http"))
  124. return errors.NewAggregate(errs)
  125. }
  126. }
  127. return nil
  128. }
  129. type ContainerVolumeKey struct {
  130. Id string
  131. HostPath string
  132. }
  133. func (s *sPodGuestInstance) GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error) {
  134. errs := []error{}
  135. result := make(map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage)
  136. for ctrId, vols := range s.getContainerVolumeMounts() {
  137. for i := range vols {
  138. vol := vols[i]
  139. drv, ok := volume_mount.GetDriver(vol.Type).(volume_mount.IUsageVolumeMount)
  140. if !ok {
  141. continue
  142. }
  143. vu, err := s.getVolumeMountUsage(drv, ctrId, vol)
  144. if err != nil {
  145. errs = append(errs, errors.Wrapf(err, "get container %s %s volume usage: %s", ctrId, drv.GetType(), jsonutils.Marshal(vol)))
  146. } else {
  147. result[ContainerVolumeKey{
  148. Id: ctrId,
  149. HostPath: vu.HostPath,
  150. }] = vu
  151. }
  152. }
  153. }
  154. return result, errors.NewAggregate(errs)
  155. }
  156. func (s *sPodGuestInstance) getVolumeMountUsage(drv volume_mount.IUsageVolumeMount, ctrId string, vol *hostapi.ContainerVolumeMount) (*volume_mount.ContainerVolumeMountUsage, error) {
  157. hp, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
  158. if err != nil {
  159. return nil, errors.Wrap(err, "GetRuntimeMountHostPath")
  160. }
  161. us, err := disk.Usage(hp)
  162. if err != nil {
  163. return nil, errors.Wrapf(err, "disk.Usage of %s", hp)
  164. }
  165. usage := &volume_mount.ContainerVolumeMountUsage{
  166. Id: ctrId,
  167. MountPath: vol.MountPath,
  168. HostPath: hp,
  169. VolumeType: string(drv.GetType()),
  170. Usage: us,
  171. Tags: make(map[string]string),
  172. }
  173. drv.InjectUsageTags(usage, vol)
  174. return usage, nil
  175. }
  176. func (s *sPodGuestInstance) RestartLocalPodAndContainers(ctx context.Context, cred mcclient.TokenCredential) {
  177. s.manager.GuestStartWorker.Run(newLocalPodRestartTask(ctx, cred, s), nil, nil)
  178. }
  179. type localPodRestartTask struct {
  180. ctx context.Context
  181. userCred mcclient.TokenCredential
  182. pod *sPodGuestInstance
  183. }
  184. func newLocalPodRestartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localPodRestartTask {
  185. return &localPodRestartTask{
  186. ctx: ctx,
  187. userCred: userCred,
  188. pod: pod,
  189. }
  190. }
  191. func (t *localPodRestartTask) Run() {
  192. log.Infof("restart pod and containers locally (%s/%s)", t.pod.Id, t.pod.GetName())
  193. for _, ctr := range t.pod.GetContainers() {
  194. log.Infof("stop container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
  195. if _, err := t.pod.StopContainer(t.ctx, t.userCred, ctr.Id, &hostapi.ContainerStopInput{
  196. Timeout: 0,
  197. ShmSizeMB: ctr.Spec.ShmSizeMB,
  198. ContainerName: ctr.Name,
  199. }); err != nil {
  200. log.Errorf("stop container %s error: %v", ctr.Name, err)
  201. }
  202. }
  203. if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
  204. log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
  205. return
  206. }
  207. for _, ctr := range t.pod.GetContainers() {
  208. log.Infof("start container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
  209. if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil {
  210. log.Errorf("start container %s err: %s", ctr.Id, err.Error())
  211. }
  212. }
  213. t.pod.SyncStatus("sync status after pod and containers restart locally", "")
  214. }
  215. func (t *localPodRestartTask) Dump() string {
  216. return fmt.Sprintf("pod restart task %s/%s", t.pod.GetId(), t.pod.GetName())
  217. }
  218. func GetPodStatusByContainerStatus(status string, cStatus string, isPrimary bool) string {
  219. if cStatus == computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF && isPrimary {
  220. status = computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF
  221. }
  222. if cStatus == computeapi.CONTAINER_STATUS_EXITED && status != computeapi.VM_READY {
  223. // status = computeapi.POD_STATUS_CONTAINER_EXITED
  224. if isPrimary {
  225. status = computeapi.VM_READY
  226. }
  227. }
  228. return status
  229. }
  230. type SCpuFreqRealTimeSimulateManager struct {
  231. lastTimeCpuUsage map[string]int64
  232. lastTime *int64
  233. interval int
  234. stop chan struct{}
  235. cpufreqMax int64
  236. cpufreqMin int64
  237. }
  238. func newCpuFreqRealTimeSimulateManager(intervalSecond int, cpufreqMax, cpufreqMin int64) *SCpuFreqRealTimeSimulateManager {
  239. return &SCpuFreqRealTimeSimulateManager{
  240. interval: intervalSecond,
  241. stop: make(chan struct{}),
  242. cpufreqMax: cpufreqMax,
  243. cpufreqMin: cpufreqMin,
  244. }
  245. }
  246. func (m *SCpuFreqRealTimeSimulateManager) StartSetCpuFreqSimulate() {
  247. log.Infof("StartSetCpuFreqSimulate")
  248. ticker := time.NewTicker(time.Duration(m.interval) * time.Second)
  249. defer ticker.Stop()
  250. for {
  251. select {
  252. case <-ticker.C:
  253. m.startSetCpuFreqSimulate()
  254. case <-m.stop:
  255. return
  256. }
  257. }
  258. }
  259. func (m *SCpuFreqRealTimeSimulateManager) Stop() {
  260. close(m.stop)
  261. }
  262. func (m *SCpuFreqRealTimeSimulateManager) startSetCpuFreqSimulate() {
  263. newCpuUsage := map[string]int64{}
  264. startTime := time.Now().UnixNano()
  265. guestManager.Servers.Range(func(k, v interface{}) bool {
  266. pod, ok := v.(*sPodGuestInstance)
  267. if !ok {
  268. log.Errorf("is not pod instance")
  269. return false
  270. }
  271. cgroupRoot := path.Join(cgrouputils.GetSubModulePath("cpuacct"), PodCgroupParent())
  272. criIds := pod.GetPodContainerCriIds()
  273. for i := range criIds {
  274. ctr, err := pod.GetContainerByCRIId(criIds[i])
  275. if err != nil {
  276. log.Errorf("failed get %s ctrid by criId %s", pod.GetName(), criIds[i])
  277. continue
  278. }
  279. if ctr == nil {
  280. log.Errorf("found nil container desc by criId %s in pod %s", criIds[i], pod.GetName())
  281. continue
  282. }
  283. cpuDir := pod.getContainerSystemCpusDir(ctr.Id)
  284. if !fileutils2.Exists(cpuDir) {
  285. log.Debugf("%s %s has no cpuDir", pod.GetName(), criIds[i])
  286. continue
  287. }
  288. usagePath := path.Join(cgroupRoot, criIds[i], "cpuacct.usage")
  289. criCpuUsageStr, err := fileutils2.FileGetContents(usagePath)
  290. if err != nil {
  291. continue
  292. }
  293. criCpuUsageStr = strings.TrimSpace(criCpuUsageStr)
  294. criCpuUsage, err := strconv.ParseInt(criCpuUsageStr, 10, 0)
  295. if err != nil {
  296. log.Errorf("failed parse %s %s: %s", usagePath, criCpuUsageStr, err)
  297. continue
  298. }
  299. newCpuUsage[criIds[i]] = criCpuUsage
  300. if m.lastTime != nil {
  301. if lastTimeUsage, ok := m.lastTimeCpuUsage[criIds[i]]; ok {
  302. timeDiff := float64(startTime - *m.lastTime)
  303. usageDiff := float64(criCpuUsage - lastTimeUsage)
  304. cpuUsagePercent := (usageDiff / timeDiff) * 100
  305. freqRange := float64(m.cpufreqMax - m.cpufreqMin)
  306. estimatedFreq := m.cpufreqMin + int64(freqRange*cpuUsagePercent/100)
  307. err := pod.simulateContainerSystemCpuSetScalingCurFreq(ctr.Id, estimatedFreq)
  308. if err != nil {
  309. log.Errorf("failed set %s(%s) simulate cpufreq: %s", pod.GetId(), criIds[i], err)
  310. }
  311. }
  312. }
  313. }
  314. return true
  315. })
  316. m.lastTime = &startTime
  317. m.lastTimeCpuUsage = newCpuUsage
  318. }