cri_stats_provider.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package stats
  15. import (
  16. "context"
  17. "fmt"
  18. "path"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "time"
  23. cadvisorfs "github.com/google/cadvisor/fs"
  24. cadvisorapiv1 "github.com/google/cadvisor/info/v1"
  25. cadvisorapiv2 "github.com/google/cadvisor/info/v2"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
  29. "k8s.io/klog/v2"
  30. "yunion.io/x/pkg/errors"
  31. "yunion.io/x/onecloud/pkg/util/pod/cadvisor"
  32. )
  33. var (
  34. // defaultCachePeriod is the default cache period for each cpuUsage.
  35. defaultCachePeriod = 10 * time.Minute
  36. )
  37. type cpuUsageRecord struct {
  38. stats *runtimeapi.CpuUsage
  39. usageNanoCores *uint64
  40. }
  41. // criStatsProvider implements the ContainerStatsProvider interface by getting
  42. // the container stats from CRI.
  43. type criStatsProvider struct {
  44. // cadvisor is used to get the node root filesystem's stats (such as the
  45. // capacity/available bytes/inodes) that will be populated in per container
  46. // filesystem stats.
  47. cadvisor cadvisor.Interface
  48. // runtimeService is used to get the status and stats of the pods and its
  49. // managed containers.
  50. runtimeService runtimeapi.RuntimeServiceClient
  51. // imageService is used to get the stats of the image filesystem.
  52. imageService runtimeapi.ImageServiceClient
  53. // cpuUsageCache caches the cpu usage for containers.
  54. cpuUsageCache map[string]*cpuUsageRecord
  55. mutex sync.RWMutex
  56. }
  57. func NewCRIContainerStatsProvider(
  58. cadvisor cadvisor.Interface,
  59. runtimeService runtimeapi.RuntimeServiceClient,
  60. imageService runtimeapi.ImageServiceClient,
  61. ) ContainerStatsProvider {
  62. return newCRIStatsProvider(cadvisor, runtimeService, imageService)
  63. }
  64. // newCRIStatsProvider returns a ContainerStatsProvider implementation that
  65. // provides container stats using CRI.
  66. func newCRIStatsProvider(
  67. cadvisor cadvisor.Interface,
  68. runtimeService runtimeapi.RuntimeServiceClient,
  69. imageService runtimeapi.ImageServiceClient,
  70. ) ContainerStatsProvider {
  71. return &criStatsProvider{
  72. cadvisor: cadvisor,
  73. runtimeService: runtimeService,
  74. imageService: imageService,
  75. cpuUsageCache: make(map[string]*cpuUsageRecord),
  76. }
  77. }
  78. func (p *criStatsProvider) ListPodStats() ([]PodStats, error) {
  79. // Don't update CPU nano core usage.
  80. return p.listPodStats(false)
  81. }
  82. // ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
  83. // the containers and returns the stats for all the pod-managed containers.
  84. // This is a workaround because CRI runtimes do not supply nano core usages,
  85. // so this function calculate the difference between the current and the last
  86. // (cached) cpu stats to calculate this metrics. The implementation assumes a
  87. // single caller to periodically invoke this function to update the metrics. If
  88. // there exist multiple callers, the period used to compute the cpu usage may
  89. // vary and the usage could be incoherent (e.g., spiky). If no caller calls
  90. // this function, the cpu usage will stay nil. Right now, eviction manager is
  91. // the only caller, and it calls this function every 10s.
  92. func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]PodStats, error) {
  93. // Update CPU nano core usage.
  94. return p.listPodStats(true)
  95. }
  96. func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]PodStats, error) {
  97. // Gets node root filesystem information, which will be used to populate
  98. // the available and capacity bytes/inodes in container stats.
  99. rootFsInfo, err := p.cadvisor.RootFsInfo()
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to get rootFs info: %v", err)
  102. }
  103. csResp, err := p.runtimeService.ListContainers(context.Background(), &runtimeapi.ListContainersRequest{})
  104. if err != nil {
  105. return nil, errors.Wrap(err, "failed to list all containers")
  106. }
  107. containers := csResp.Containers
  108. // Creates pod sandbox map.
  109. podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
  110. resp, err := p.runtimeService.ListPodSandbox(context.Background(), &runtimeapi.ListPodSandboxRequest{})
  111. if err != nil {
  112. return nil, errors.Wrap(err, "failed to list all pod sandboxes")
  113. }
  114. podSandboxes := removeTerminatedPods(resp.Items)
  115. for _, s := range podSandboxes {
  116. podSandboxMap[s.Id] = s
  117. }
  118. // fsIDtoInfo is a map from filesystem id to its stats. This will be used
  119. // as a cache to avoid querying cAdvisor for the filesystem stats with the
  120. // same filesystem id many times.
  121. fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
  122. // sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
  123. sandboxIDToPodStats := make(map[string]*PodStats)
  124. cstsResp, err := p.runtimeService.ListContainerStats(context.Background(), &runtimeapi.ListContainerStatsRequest{})
  125. if err != nil {
  126. return nil, fmt.Errorf("failed to list all container stats: %v", err)
  127. }
  128. containers = removeTerminatedContainers(containers)
  129. // Creates container map.
  130. containerMap := make(map[string]*runtimeapi.Container)
  131. for _, c := range containers {
  132. containerMap[c.Id] = c
  133. }
  134. allInfos, err := getCadvisorContainerInfo(p.cadvisor)
  135. if err != nil {
  136. return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
  137. }
  138. caInfos := getCRICadvisorStats(allInfos)
  139. // get network stats for containers.
  140. // This is only used on Windows. For other platforms, (nil, nil) should be returned.
  141. containerNetworkStats, err := p.listContainerNetworkStats()
  142. if err != nil {
  143. return nil, fmt.Errorf("failed to list container network stats: %v", err)
  144. }
  145. for _, stats := range cstsResp.Stats {
  146. containerID := stats.Attributes.Id
  147. container, found := containerMap[containerID]
  148. if !found {
  149. continue
  150. }
  151. podSandboxID := container.PodSandboxId
  152. podSandbox, found := podSandboxMap[podSandboxID]
  153. if !found {
  154. continue
  155. }
  156. // Creates the stats of the pod (if not created yet) which the
  157. // container belongs to.
  158. ps, found := sandboxIDToPodStats[podSandboxID]
  159. if !found {
  160. ps = buildPodStats(podSandbox)
  161. sandboxIDToPodStats[podSandboxID] = ps
  162. }
  163. // Fill available stats for full set of required pod stats
  164. cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage, allInfos)
  165. p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
  166. p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
  167. p.addDiskIoStats(ps, types.UID(podSandboxID), allInfos, cs)
  168. p.addProcessStats(ps, types.UID(podSandboxID), allInfos, cs)
  169. // If cadvisor stats is available for the container, use it to populate
  170. // container stats
  171. caStats, caFound := caInfos[containerID]
  172. if !caFound {
  173. klog.V(5).Infof("Unable to find cadvisor stats for %q", containerID)
  174. } else {
  175. p.addCadvisorContainerStats(cs, &caStats)
  176. }
  177. ps.Containers = append(ps.Containers, *cs)
  178. }
  179. // cleanup outdated caches.
  180. p.cleanupOutdatedCaches()
  181. result := make([]PodStats, 0, len(sandboxIDToPodStats))
  182. for _, s := range sandboxIDToPodStats {
  183. //p.makePodStorageStats(s, &rootFsInfo)
  184. result = append(result, *s)
  185. }
  186. return result, nil
  187. }
  188. func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]PodStats, error) {
  189. ctx := context.Background()
  190. containersResp, err := p.runtimeService.ListContainers(ctx, &runtimeapi.ListContainersRequest{})
  191. if err != nil {
  192. return nil, fmt.Errorf("failed to list all containers: %v", err)
  193. }
  194. containers := containersResp.Containers
  195. // Creates pod sandbox map.
  196. podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
  197. resp, err := p.runtimeService.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{})
  198. if err != nil {
  199. return nil, fmt.Errorf("failed to list all pod sandboxes: %v", err)
  200. }
  201. podSandboxes := resp.Items
  202. podSandboxes = removeTerminatedPods(podSandboxes)
  203. for _, s := range podSandboxes {
  204. podSandboxMap[s.Id] = s
  205. }
  206. // sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
  207. sandboxIDToPodStats := make(map[string]*PodStats)
  208. containerStatResp, err := p.runtimeService.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{})
  209. if err != nil {
  210. return nil, fmt.Errorf("failed to list all container stats: %v", err)
  211. }
  212. containers = removeTerminatedContainers(containers)
  213. // Creates container map.
  214. containerMap := make(map[string]*runtimeapi.Container)
  215. for _, c := range containers {
  216. containerMap[c.Id] = c
  217. }
  218. allInfos, err := getCadvisorContainerInfo(p.cadvisor)
  219. if err != nil {
  220. return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
  221. }
  222. caInfos := getCRICadvisorStats(allInfos)
  223. for _, stats := range containerStatResp.Stats {
  224. containerID := stats.Attributes.Id
  225. container, found := containerMap[containerID]
  226. if !found {
  227. continue
  228. }
  229. podSandboxID := container.PodSandboxId
  230. podSandbox, found := podSandboxMap[podSandboxID]
  231. if !found {
  232. continue
  233. }
  234. // Creates the stats of the pod (if not created yet) which the
  235. // container belongs to.
  236. ps, found := sandboxIDToPodStats[podSandboxID]
  237. if !found {
  238. ps = buildPodStats(podSandbox)
  239. sandboxIDToPodStats[podSandboxID] = ps
  240. }
  241. // Fill available CPU and memory stats for full set of required pod stats
  242. cs := p.makeContainerCPUAndMemoryStats(stats, container, allInfos)
  243. p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
  244. p.addDiskIoStats(ps, types.UID(podSandboxID), allInfos, cs)
  245. p.addProcessStats(ps, types.UID(podSandboxID), allInfos, cs)
  246. // If cadvisor stats is available for the container, use it to populate
  247. // container stats
  248. caStats, caFound := caInfos[containerID]
  249. if !caFound {
  250. klog.V(4).Infof("Unable to find cadvisor stats for %q", containerID)
  251. } else {
  252. p.addCadvisorContainerStats(cs, &caStats)
  253. }
  254. ps.Containers = append(ps.Containers, *cs)
  255. }
  256. // cleanup outdated caches.
  257. p.cleanupOutdatedCaches()
  258. result := make([]PodStats, 0, len(sandboxIDToPodStats))
  259. for _, s := range sandboxIDToPodStats {
  260. result = append(result, *s)
  261. }
  262. return result, nil
  263. }
  264. func (p *criStatsProvider) ImageFsStats() (FsStats, error) {
  265. //TODO implement me
  266. panic("implement me")
  267. }
  268. func (p *criStatsProvider) ImageFsDevice() (string, error) {
  269. //TODO implement me
  270. panic("implement me")
  271. }
  272. // buildPodStats returns a PodStats that identifies the Pod managing cinfo
  273. func buildPodStats(podSandbox *runtimeapi.PodSandbox) *PodStats {
  274. return &PodStats{
  275. PodRef: PodReference{
  276. Name: podSandbox.Metadata.Name,
  277. UID: podSandbox.Metadata.Uid,
  278. Namespace: podSandbox.Metadata.Namespace,
  279. },
  280. // The StartTime in the summary API is the pod creation time.
  281. StartTime: metav1.NewTime(time.Unix(0, podSandbox.CreatedAt)),
  282. }
  283. }
  284. /*func (p *criStatsProvider) makePodStorageStats(s *PodStats, rootFsInfo *cadvisorapiv2.FsInfo) {
  285. podNs := s.PodRef.Namespace
  286. podName := s.PodRef.Name
  287. podUID := types.UID(s.PodRef.UID)
  288. vstats, found := p.resourceAnalyzer.GetPodVolumeStats(podUID)
  289. if !found {
  290. return
  291. }
  292. podLogDir := kuberuntime.BuildPodLogsDirectory(podNs, podName, podUID)
  293. logStats, err := p.getPodLogStats(podLogDir, rootFsInfo)
  294. if err != nil {
  295. klog.Errorf("Unable to fetch pod log stats for path %s: %v ", podLogDir, err)
  296. // If people do in-place upgrade, there might be pods still using
  297. // the old log path. For those pods, no pod log stats is returned.
  298. // We should continue generating other stats in that case.
  299. // calcEphemeralStorage tolerants logStats == nil.
  300. }
  301. ephemeralStats := make([]statsapi.VolumeStats, len(vstats.EphemeralVolumes))
  302. copy(ephemeralStats, vstats.EphemeralVolumes)
  303. s.VolumeStats = append(append([]statsapi.VolumeStats{}, vstats.EphemeralVolumes...), vstats.PersistentVolumes...)
  304. s.EphemeralStorage = calcEphemeralStorage(s.Containers, ephemeralStats, rootFsInfo, logStats, true)
  305. }*/
  306. func (p *criStatsProvider) addPodNetworkStats(
  307. ps *PodStats,
  308. podSandboxID string,
  309. caInfos map[string]cadvisorapiv2.ContainerInfo,
  310. cs *ContainerStats,
  311. netStats *NetworkStats,
  312. ) {
  313. caPodSandbox, found := caInfos[podSandboxID]
  314. // try get network stats from cadvisor first.
  315. if found {
  316. networkStats := cadvisorInfoToNetworkStats(&caPodSandbox)
  317. if networkStats != nil {
  318. ps.Network = networkStats
  319. return
  320. }
  321. }
  322. // Not found from cadvisor, get from netStats.
  323. if netStats != nil {
  324. ps.Network = netStats
  325. return
  326. }
  327. // TODO: sum Pod network stats from container stats.
  328. klog.V(4).Infof("Unable to find network stats for sandbox %q", podSandboxID)
  329. }
  330. func (p *criStatsProvider) addPodCPUMemoryStats(
  331. ps *PodStats,
  332. podUID types.UID,
  333. allInfos map[string]cadvisorapiv2.ContainerInfo,
  334. cs *ContainerStats,
  335. ) {
  336. // try get cpu and memory stats from cadvisor first.
  337. podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
  338. if podCgroupInfo != nil {
  339. cpu, memory := cadvisorInfoToCPUandMemoryStats(podCgroupInfo)
  340. ps.CPU = cpu
  341. ps.Memory = memory
  342. return
  343. }
  344. // Sum Pod cpu and memory stats from containers stats.
  345. if cs.CPU != nil {
  346. if ps.CPU == nil {
  347. ps.CPU = &CPUStats{}
  348. }
  349. ps.CPU.Time = cs.CPU.Time
  350. usageCoreNanoSeconds := getUint64Value(cs.CPU.UsageCoreNanoSeconds) + getUint64Value(ps.CPU.UsageCoreNanoSeconds)
  351. usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
  352. ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
  353. ps.CPU.UsageNanoCores = &usageNanoCores
  354. }
  355. if cs.Memory != nil {
  356. if ps.Memory == nil {
  357. ps.Memory = &MemoryStats{}
  358. }
  359. ps.Memory.Time = cs.Memory.Time
  360. availableBytes := getUint64Value(cs.Memory.AvailableBytes) + getUint64Value(ps.Memory.AvailableBytes)
  361. usageBytes := getUint64Value(cs.Memory.UsageBytes) + getUint64Value(ps.Memory.UsageBytes)
  362. workingSetBytes := getUint64Value(cs.Memory.WorkingSetBytes) + getUint64Value(ps.Memory.WorkingSetBytes)
  363. rSSBytes := getUint64Value(cs.Memory.RSSBytes) + getUint64Value(ps.Memory.RSSBytes)
  364. pageFaults := getUint64Value(cs.Memory.PageFaults) + getUint64Value(ps.Memory.PageFaults)
  365. majorPageFaults := getUint64Value(cs.Memory.MajorPageFaults) + getUint64Value(ps.Memory.MajorPageFaults)
  366. ps.Memory.AvailableBytes = &availableBytes
  367. ps.Memory.UsageBytes = &usageBytes
  368. ps.Memory.WorkingSetBytes = &workingSetBytes
  369. ps.Memory.RSSBytes = &rSSBytes
  370. ps.Memory.PageFaults = &pageFaults
  371. ps.Memory.MajorPageFaults = &majorPageFaults
  372. }
  373. }
  374. func (p *criStatsProvider) addDiskIoStats(
  375. ps *PodStats,
  376. podUID types.UID,
  377. allInfos map[string]cadvisorapiv2.ContainerInfo,
  378. cs *ContainerStats) {
  379. info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
  380. if info != nil {
  381. ps.DiskIo = cadvisorInfoToDiskIoStats(info)
  382. }
  383. if ps.DiskIo == nil {
  384. ps.DiskIo = make(map[string]*DiskIoStat)
  385. }
  386. ps.DiskIo.Add(cs.DiskIo)
  387. }
  388. func (p *criStatsProvider) addProcessStats(
  389. ps *PodStats,
  390. podUID types.UID,
  391. allInfos map[string]cadvisorapiv2.ContainerInfo,
  392. cs *ContainerStats,
  393. ) {
  394. // try get process stats from cadvisor only.
  395. info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
  396. if info != nil {
  397. ps.ProcessStats = cadvisorInfoToProcessStats(info)
  398. }
  399. if cs.ProcessStats != nil {
  400. if ps.ProcessStats == nil {
  401. ps.ProcessStats = &ProcessStats{}
  402. }
  403. ps.ProcessStats.ProcessCount += cs.ProcessStats.ProcessCount
  404. ps.ProcessStats.FdCount += cs.ProcessStats.FdCount
  405. ps.ProcessStats.SocketCount += cs.ProcessStats.SocketCount
  406. ps.ProcessStats.ThreadsCurrent += cs.ProcessStats.ThreadsCurrent
  407. ps.ProcessStats.ThreadsMax += cs.ProcessStats.ThreadsMax
  408. }
  409. }
  410. // getFsInfo returns the information of the filesystem with the specified
  411. // fsID. If any error occurs, this function logs the error and returns
  412. // nil.
  413. func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) *cadvisorapiv2.FsInfo {
  414. if fsID == nil {
  415. klog.V(2).Infof("Failed to get filesystem info: fsID is nil.")
  416. return nil
  417. }
  418. mountpoint := fsID.GetMountpoint()
  419. fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint)
  420. if err != nil {
  421. msg := fmt.Sprintf("Failed to get the info of the filesystem with mountpoint %q: %v.", mountpoint, err)
  422. if err == cadvisorfs.ErrNoSuchDevice {
  423. klog.V(2).Info(msg)
  424. } else {
  425. klog.Error(msg)
  426. }
  427. return nil
  428. }
  429. return &fsInfo
  430. }
  431. func (p *criStatsProvider) makeContainerStats(stats *runtimeapi.ContainerStats, container *runtimeapi.Container, rootFsInfo *cadvisorapiv2.FsInfo, fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo, meta *runtimeapi.PodSandboxMetadata, updateCPUNanoCoreUsage bool, infos map[string]cadvisorapiv2.ContainerInfo) *ContainerStats {
  432. result := &ContainerStats{
  433. Name: stats.Attributes.Metadata.Name,
  434. // The StartTime in the summary API is the container creation time.
  435. StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
  436. CPU: &CPUStats{},
  437. Memory: &MemoryStats{},
  438. Rootfs: &FsStats{},
  439. // UserDefinedMetrics is not supported by CRI.
  440. ProcessStats: &ProcessStats{},
  441. }
  442. // process stats
  443. cStats := getLatestContainerStatsById(stats.Attributes.GetId(), infos)
  444. if cStats != nil {
  445. result.ProcessStats = convertToProcessStats(cStats.Processes)
  446. result.DiskIo = convertToDiskIoStats(cStats.DiskIo)
  447. }
  448. if stats.Cpu != nil {
  449. result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
  450. if stats.Cpu.UsageCoreNanoSeconds != nil {
  451. result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
  452. }
  453. var usageNanoCores *uint64
  454. if updateCPUNanoCoreUsage {
  455. usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
  456. } else {
  457. usageNanoCores = p.getContainerUsageNanoCores(stats)
  458. }
  459. if usageNanoCores != nil {
  460. result.CPU.UsageNanoCores = usageNanoCores
  461. }
  462. } else {
  463. result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
  464. result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
  465. result.CPU.UsageNanoCores = uint64Ptr(0)
  466. }
  467. if stats.Memory != nil {
  468. result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
  469. if stats.Memory.WorkingSetBytes != nil {
  470. result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
  471. }
  472. if cStats != nil && cStats.Memory != nil {
  473. result.Memory.UsageBytes = &cStats.Memory.Usage
  474. result.Memory.RSSBytes = &cStats.Memory.RSS
  475. }
  476. } else {
  477. result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
  478. result.Memory.WorkingSetBytes = uint64Ptr(0)
  479. }
  480. if stats.WritableLayer != nil {
  481. result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
  482. if stats.WritableLayer.UsedBytes != nil {
  483. result.Rootfs.UsedBytes = &stats.WritableLayer.UsedBytes.Value
  484. }
  485. if stats.WritableLayer.InodesUsed != nil {
  486. result.Rootfs.InodesUsed = &stats.WritableLayer.InodesUsed.Value
  487. }
  488. }
  489. fsID := stats.GetWritableLayer().GetFsId()
  490. if fsID != nil {
  491. imageFsInfo, found := fsIDtoInfo[*fsID]
  492. if !found {
  493. imageFsInfo = p.getFsInfo(fsID)
  494. fsIDtoInfo[*fsID] = imageFsInfo
  495. }
  496. if imageFsInfo != nil {
  497. // The image filesystem id is unknown to the local node or there's
  498. // an error on retrieving the stats. In these cases, we omit those stats
  499. // and return the best-effort partial result. See
  500. // https://github.com/kubernetes/heapster/issues/1793.
  501. result.Rootfs.AvailableBytes = &imageFsInfo.Available
  502. result.Rootfs.CapacityBytes = &imageFsInfo.Capacity
  503. result.Rootfs.InodesFree = imageFsInfo.InodesFree
  504. result.Rootfs.Inodes = imageFsInfo.Inodes
  505. }
  506. }
  507. // NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
  508. // using old log path, empty log stats are returned. This is fine, because we don't
  509. // officially support in-place upgrade anyway.
  510. /*var (
  511. containerLogPath = kuberuntime.BuildContainerLogsDirectory(meta.GetNamespace(),
  512. meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName())
  513. err error
  514. )
  515. result.Logs, err = p.getPathFsStats(containerLogPath, rootFsInfo)
  516. if err != nil {
  517. klog.Errorf("Unable to fetch container log stats for path %s: %v ", containerLogPath, err)
  518. }*/
  519. return result
  520. }
  521. func convertToProcessStats(cStats *cadvisorapiv1.ProcessStats) *ProcessStats {
  522. if cStats == nil {
  523. return nil
  524. }
  525. return &ProcessStats{
  526. ProcessCount: cStats.ProcessCount,
  527. FdCount: cStats.FdCount,
  528. SocketCount: cStats.SocketCount,
  529. ThreadsCurrent: cStats.ThreadsCurrent,
  530. ThreadsMax: cStats.ThreadsMax,
  531. }
  532. }
  533. func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
  534. stats *runtimeapi.ContainerStats,
  535. container *runtimeapi.Container,
  536. infos map[string]cadvisorapiv2.ContainerInfo,
  537. ) *ContainerStats {
  538. result := &ContainerStats{
  539. Name: stats.Attributes.Metadata.Name,
  540. // The StartTime in the summary API is the container creation time.
  541. StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
  542. CPU: &CPUStats{},
  543. Memory: &MemoryStats{},
  544. // UserDefinedMetrics is not supported by CRI.
  545. ProcessStats: &ProcessStats{},
  546. }
  547. // process stats
  548. cStats := getLatestContainerStatsById(stats.Attributes.GetId(), infos)
  549. if cStats != nil {
  550. result.ProcessStats = convertToProcessStats(cStats.Processes)
  551. result.DiskIo = convertToDiskIoStats(cStats.DiskIo)
  552. }
  553. if stats.Cpu != nil {
  554. result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
  555. if stats.Cpu.UsageCoreNanoSeconds != nil {
  556. result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
  557. }
  558. usageNanoCores := p.getContainerUsageNanoCores(stats)
  559. if usageNanoCores != nil {
  560. result.CPU.UsageNanoCores = usageNanoCores
  561. }
  562. } else {
  563. result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
  564. result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
  565. result.CPU.UsageNanoCores = uint64Ptr(0)
  566. }
  567. if stats.Memory != nil {
  568. result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
  569. if stats.Memory.WorkingSetBytes != nil {
  570. result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
  571. }
  572. if cStats != nil && cStats.Memory != nil {
  573. result.Memory.UsageBytes = &cStats.Memory.Usage
  574. result.Memory.RSSBytes = &cStats.Memory.RSS
  575. }
  576. } else {
  577. result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
  578. result.Memory.WorkingSetBytes = uint64Ptr(0)
  579. }
  580. return result
  581. }
  582. // getContainerUsageNanoCores gets the cached usageNanoCores.
  583. func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
  584. if stats == nil || stats.Attributes == nil {
  585. return nil
  586. }
  587. p.mutex.RLock()
  588. defer p.mutex.RUnlock()
  589. cached, ok := p.cpuUsageCache[stats.Attributes.Id]
  590. if !ok || cached.usageNanoCores == nil {
  591. return nil
  592. }
  593. // return a copy of the usage
  594. latestUsage := *cached.usageNanoCores
  595. return &latestUsage
  596. }
  597. // getContainerUsageNanoCores computes usageNanoCores based on the given and
  598. // the cached usageCoreNanoSeconds, updates the cache with the computed
  599. // usageNanoCores, and returns the usageNanoCores.
  600. func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
  601. if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
  602. return nil
  603. }
  604. id := stats.Attributes.Id
  605. usage, err := func() (*uint64, error) {
  606. p.mutex.Lock()
  607. defer p.mutex.Unlock()
  608. cached, ok := p.cpuUsageCache[id]
  609. if !ok || cached.stats.UsageCoreNanoSeconds == nil || stats.Cpu.UsageCoreNanoSeconds.Value < cached.stats.UsageCoreNanoSeconds.Value {
  610. // Cannot compute the usage now, but update the cached stats anyway
  611. p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
  612. return nil, nil
  613. }
  614. newStats := stats.Cpu
  615. cachedStats := cached.stats
  616. nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
  617. if nanoSeconds <= 0 {
  618. return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
  619. }
  620. usageNanoCores := uint64(float64(newStats.UsageCoreNanoSeconds.Value-cachedStats.UsageCoreNanoSeconds.Value) /
  621. float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
  622. // Update cache with new value.
  623. usageToUpdate := usageNanoCores
  624. p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
  625. return &usageNanoCores, nil
  626. }()
  627. if err != nil {
  628. // This should not happen. Log now to raise visibility
  629. klog.Errorf("failed updating cpu usage nano core: %v", err)
  630. }
  631. return usage
  632. }
  633. func (p *criStatsProvider) cleanupOutdatedCaches() {
  634. p.mutex.Lock()
  635. defer p.mutex.Unlock()
  636. for k, v := range p.cpuUsageCache {
  637. if v == nil {
  638. delete(p.cpuUsageCache, k)
  639. continue
  640. }
  641. if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
  642. delete(p.cpuUsageCache, k)
  643. }
  644. }
  645. }
  646. // removeTerminatedPods returns pods with terminated ones removed.
  647. // It only removes a terminated pod when there is a running instance
  648. // of the pod with the same name and namespace.
  649. // This is needed because:
  650. // 1) PodSandbox may be recreated;
  651. // 2) Pod may be recreated with the same name and namespace.
  652. func removeTerminatedPods(pods []*runtimeapi.PodSandbox) []*runtimeapi.PodSandbox {
  653. podMap := make(map[PodReference][]*runtimeapi.PodSandbox)
  654. // Sort order by create time
  655. sort.Slice(pods, func(i, j int) bool {
  656. return pods[i].CreatedAt < pods[j].CreatedAt
  657. })
  658. for _, pod := range pods {
  659. refID := PodReference{
  660. Name: pod.GetMetadata().GetName(),
  661. Namespace: pod.GetMetadata().GetNamespace(),
  662. // UID is intentionally left empty.
  663. }
  664. podMap[refID] = append(podMap[refID], pod)
  665. }
  666. result := make([]*runtimeapi.PodSandbox, 0)
  667. for _, refs := range podMap {
  668. if len(refs) == 1 {
  669. result = append(result, refs[0])
  670. continue
  671. }
  672. found := false
  673. for i := 0; i < len(refs); i++ {
  674. if refs[i].State == runtimeapi.PodSandboxState_SANDBOX_READY {
  675. found = true
  676. result = append(result, refs[i])
  677. }
  678. }
  679. if !found {
  680. result = append(result, refs[len(refs)-1])
  681. }
  682. }
  683. return result
  684. }
  685. // removeTerminatedContainers removes all terminated containers since they should
  686. // not be used for usage calculations.
  687. func removeTerminatedContainers(containers []*runtimeapi.Container) []*runtimeapi.Container {
  688. containerMap := make(map[containerID][]*runtimeapi.Container)
  689. // Sort order by create time
  690. sort.Slice(containers, func(i, j int) bool {
  691. return containers[i].CreatedAt < containers[j].CreatedAt
  692. })
  693. for _, container := range containers {
  694. refID := containerID{
  695. podRef: buildPodRef(container.Labels),
  696. containerName: GetContainerName(container.Labels),
  697. }
  698. containerMap[refID] = append(containerMap[refID], container)
  699. }
  700. result := make([]*runtimeapi.Container, 0)
  701. for _, refs := range containerMap {
  702. for i := 0; i < len(refs); i++ {
  703. if refs[i].State == runtimeapi.ContainerState_CONTAINER_RUNNING {
  704. result = append(result, refs[i])
  705. }
  706. }
  707. }
  708. return result
  709. }
  710. func (p *criStatsProvider) addCadvisorContainerStats(
  711. cs *ContainerStats,
  712. caPodStats *cadvisorapiv2.ContainerInfo,
  713. ) {
  714. if caPodStats.Spec.HasCustomMetrics {
  715. cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
  716. }
  717. cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
  718. if cpu != nil {
  719. cs.CPU = cpu
  720. }
  721. if memory != nil {
  722. cs.Memory = memory
  723. }
  724. }
  725. func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) map[string]cadvisorapiv2.ContainerInfo {
  726. stats := make(map[string]cadvisorapiv2.ContainerInfo)
  727. infos = removeTerminatedContainerInfo(infos)
  728. for key, info := range infos {
  729. // On systemd using devicemapper each mount into the container has an
  730. // associated cgroup. We ignore them to ensure we do not get duplicate
  731. // entries in our summary. For details on .mount units:
  732. // http://man7.org/linux/man-pages/man5/systemd.mount.5.html
  733. if strings.HasSuffix(key, ".mount") {
  734. continue
  735. }
  736. // Build the Pod key if this container is managed by a Pod
  737. if !isPodManagedContainer(&info) {
  738. continue
  739. }
  740. stats[path.Base(key)] = info
  741. }
  742. return stats
  743. }
  744. /*func (p *criStatsProvider) getPathFsStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
  745. m := p.logMetricsService.createLogMetricsProvider(path)
  746. logMetrics, err := m.GetMetrics()
  747. if err != nil {
  748. return nil, err
  749. }
  750. result := &statsapi.FsStats{
  751. Time: metav1.NewTime(rootFsInfo.Timestamp),
  752. AvailableBytes: &rootFsInfo.Available,
  753. CapacityBytes: &rootFsInfo.Capacity,
  754. InodesFree: rootFsInfo.InodesFree,
  755. Inodes: rootFsInfo.Inodes,
  756. }
  757. usedbytes := uint64(logMetrics.Used.Value())
  758. result.UsedBytes = &usedbytes
  759. inodesUsed := uint64(logMetrics.InodesUsed.Value())
  760. result.InodesUsed = &inodesUsed
  761. result.Time = maxUpdateTime(&result.Time, &logMetrics.Time)
  762. return result, nil
  763. }*/
  764. // getPodLogStats gets stats for logs under the pod log directory. Container logs usually exist
  765. // under the container log directory. However, for some container runtimes, e.g. kata, gvisor,
  766. // they may want to keep some pod level logs, in that case they can put those logs directly under
  767. // the pod log directory. And kubelet will take those logs into account as part of pod ephemeral
  768. // storage.
  769. /*func (p *criStatsProvider) getPodLogStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
  770. files, err := p.osInterface.ReadDir(path)
  771. if err != nil {
  772. return nil, err
  773. }
  774. result := &statsapi.FsStats{
  775. Time: metav1.NewTime(rootFsInfo.Timestamp),
  776. AvailableBytes: &rootFsInfo.Available,
  777. CapacityBytes: &rootFsInfo.Capacity,
  778. InodesFree: rootFsInfo.InodesFree,
  779. Inodes: rootFsInfo.Inodes,
  780. }
  781. for _, f := range files {
  782. if f.IsDir() {
  783. continue
  784. }
  785. // Only include *files* under pod log directory.
  786. fpath := filepath.Join(path, f.Name())
  787. fstats, err := p.getPathFsStats(fpath, rootFsInfo)
  788. if err != nil {
  789. return nil, fmt.Errorf("failed to get fsstats for %q: %v", fpath, err)
  790. }
  791. result.UsedBytes = addUsage(result.UsedBytes, fstats.UsedBytes)
  792. result.InodesUsed = addUsage(result.InodesUsed, fstats.InodesUsed)
  793. result.Time = maxUpdateTime(&result.Time, &fstats.Time)
  794. }
  795. return result, nil
  796. }*/