hostmetrics.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package hostmetrics
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "runtime/debug"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "github.com/google/cadvisor/utils/sysfs"
  24. "github.com/shirou/gopsutil/host"
  25. psnet "github.com/shirou/gopsutil/v3/net"
  26. "github.com/shirou/gopsutil/v3/process"
  27. "yunion.io/x/log"
  28. "yunion.io/x/pkg/errors"
  29. "yunion.io/x/pkg/util/httputils"
  30. "yunion.io/x/pkg/util/netutils"
  31. "yunion.io/x/pkg/utils"
  32. billing_api "yunion.io/x/onecloud/pkg/apis/billing"
  33. "yunion.io/x/onecloud/pkg/apis/compute"
  34. "yunion.io/x/onecloud/pkg/hostman/guestman"
  35. "yunion.io/x/onecloud/pkg/hostman/guestman/desc"
  36. "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
  37. "yunion.io/x/onecloud/pkg/hostman/options"
  38. "yunion.io/x/onecloud/pkg/util/fileutils2"
  39. "yunion.io/x/onecloud/pkg/util/pod/stats"
  40. "yunion.io/x/onecloud/pkg/util/timeutils2"
  41. )
  42. const (
  43. TelegrafServer = "http://127.0.0.1:8087/write"
  44. )
  45. type SHostMetricsCollector struct {
  46. ReportInterval int // seconds
  47. running bool
  48. LastCollectTime time.Time
  49. waitingReportData []string
  50. guestMonitor *SGuestMonitorCollector
  51. }
  52. var hostMetricsCollector *SHostMetricsCollector
  53. type IHostInfo interface {
  54. GetContainerStatsProvider() stats.ContainerStatsProvider
  55. HasContainerNvidiaGpu() bool
  56. HasContainerVastaitechGpu() bool
  57. HasContainerCphAmdGpu() bool
  58. GetNvidiaGpuIndexMemoryMap() map[string]int
  59. ReportHostDmesg(data []compute.SKmsgEntry) error
  60. }
  61. var hostDmesgCollector *SHostDmesgCollector
  62. func Init(hostInfo IHostInfo) {
  63. if hostMetricsCollector == nil {
  64. hostMetricsCollector = NewHostMetricsCollector(hostInfo)
  65. }
  66. if hostDmesgCollector == nil {
  67. hostDmesgCollector = NewHostDmesgCollector(hostInfo)
  68. }
  69. }
  70. func Start() {
  71. if hostMetricsCollector != nil {
  72. go hostMetricsCollector.Start()
  73. }
  74. if options.HostOptions.EnableDmesgCollect {
  75. timeutils2.AddTimeout(30*time.Second, hostDmesgCollector.Start)
  76. }
  77. }
  78. func Stop() {
  79. if hostMetricsCollector != nil {
  80. hostMetricsCollector.Stop()
  81. }
  82. }
  83. func (m *SHostMetricsCollector) Start() {
  84. m.running = true
  85. for m.running {
  86. m.runMain()
  87. time.Sleep(time.Second * 1)
  88. }
  89. }
  90. func (m *SHostMetricsCollector) Stop() {
  91. m.running = false
  92. }
  93. func (m *SHostMetricsCollector) runMain() {
  94. timeBegin := time.Now()
  95. elapse := timeBegin.Sub(m.LastCollectTime)
  96. if elapse < time.Second*time.Duration(m.ReportInterval) {
  97. return
  98. }
  99. m.runMonitor(timeBegin, m.LastCollectTime)
  100. m.LastCollectTime = timeBegin
  101. }
  102. func (m *SHostMetricsCollector) runMonitor(now, last time.Time) {
  103. reportData := m.collectReportData(now, last)
  104. if options.HostOptions.EnableTelegraf && len(reportData) > 0 {
  105. m.reportUsageToTelegraf(reportData)
  106. }
  107. }
  108. func (m *SHostMetricsCollector) reportUsageToTelegraf(data string) {
  109. body := strings.NewReader(data)
  110. res, err := httputils.Request(httputils.GetDefaultClient(), context.Background(), "POST", TelegrafServer, nil, body, false)
  111. if err != nil {
  112. log.Errorf("Upload guest metric failed: %s", err)
  113. return
  114. }
  115. defer res.Body.Close()
  116. if res.StatusCode != 204 {
  117. resBody, _ := io.ReadAll(res.Body)
  118. log.Errorf("upload guest metric failed with %d %s, data: %s", res.StatusCode, string(resBody), data)
  119. timestamp := time.Now().UnixNano()
  120. for _, line := range strings.Split(data, "\n") {
  121. m.waitingReportData = append(m.waitingReportData,
  122. fmt.Sprintf("%s %d", line, timestamp))
  123. }
  124. } else {
  125. if len(m.waitingReportData) > 0 {
  126. oldDatas := strings.Join(m.waitingReportData, "\n")
  127. body = strings.NewReader(oldDatas)
  128. res, err = httputils.Request(httputils.GetDefaultClient(), context.Background(), "POST", TelegrafServer, nil, body, false)
  129. if err == nil {
  130. defer res.Body.Close()
  131. }
  132. if res.StatusCode == 204 {
  133. m.waitingReportData = m.waitingReportData[len(m.waitingReportData):]
  134. } else {
  135. log.Errorf("upload guest metric failed code: %d", res.StatusCode)
  136. }
  137. }
  138. }
  139. }
  140. func (m *SHostMetricsCollector) collectReportData(now, last time.Time) string {
  141. if len(m.waitingReportData) > 60 {
  142. m.waitingReportData = m.waitingReportData[1:]
  143. }
  144. return m.guestMonitor.CollectReportData(now, last)
  145. }
  146. func NewHostMetricsCollector(hostInfo IHostInfo) *SHostMetricsCollector {
  147. return &SHostMetricsCollector{
  148. ReportInterval: options.HostOptions.ReportInterval,
  149. waitingReportData: make([]string, 0, 10),
  150. guestMonitor: NewGuestMonitorCollector(hostInfo),
  151. }
  152. }
  153. type SGuestMonitorCollector struct {
  154. monitors map[string]*SGuestMonitor
  155. prevPids map[string]int
  156. prevReportData map[string]*GuestMetrics
  157. hostInfo IHostInfo
  158. }
  159. func NewGuestMonitorCollector(hostInfo IHostInfo) *SGuestMonitorCollector {
  160. return &SGuestMonitorCollector{
  161. monitors: make(map[string]*SGuestMonitor, 0),
  162. prevPids: make(map[string]int, 0),
  163. prevReportData: make(map[string]*GuestMetrics, 0),
  164. hostInfo: hostInfo,
  165. }
  166. }
  167. func (s *SGuestMonitorCollector) GetGuests() map[string]*SGuestMonitor {
  168. var err error
  169. gms := make(map[string]*SGuestMonitor, 0)
  170. guestmanager := guestman.GetGuestManager()
  171. var podStats []stats.PodStats = nil
  172. var nvidiaGpuMetrics []NvidiaGpuProcessMetrics = nil
  173. var vastaitechGpuMetrics []VastaitechGpuProcessMetrics = nil
  174. var cphAmdGpuMetrics []CphAmdGpuProcessMetrics = nil
  175. var gpuPodProcs = s.collectGpuPodsProcesses()
  176. guestmanager.Servers.Range(func(k, v interface{}) bool {
  177. instance, ok := v.(guestman.GuestRuntimeInstance)
  178. if !ok {
  179. return false
  180. }
  181. if !instance.IsValid() {
  182. return false
  183. }
  184. hypervisor := instance.GetHypervisor()
  185. guestId := instance.GetId()
  186. guestName := instance.GetDesc().Name
  187. nicsDesc := instance.GetDesc().Nics
  188. vcpuCount := instance.GetDesc().Cpu
  189. switch hypervisor {
  190. case compute.HYPERVISOR_KVM:
  191. guest := instance.(*guestman.SKVMGuestInstance)
  192. pid := guest.GetPid()
  193. if pid > 0 {
  194. gm, ok := s.monitors[guestId]
  195. if ok && gm.Pid == pid {
  196. delete(s.monitors, guestId)
  197. gm.UpdateVmName(guestName)
  198. gm.UpdateNicsDesc(nicsDesc)
  199. gm.UpdateCpuCount(int(vcpuCount))
  200. gm.MemMB = instance.GetDesc().Mem
  201. } else {
  202. delete(s.monitors, guestId)
  203. gm, err = NewGuestMonitor(instance, guestName, guestId, pid, nicsDesc, int(vcpuCount))
  204. if err != nil {
  205. log.Errorf("NewGuestMonitor for %s(%s), pid: %d, nics: %#v", guestName, guestId, pid, nicsDesc)
  206. return true
  207. }
  208. }
  209. gm.ScalingGroupId = guest.GetDesc().ScalingGroupId
  210. gm.Tenant = guest.GetDesc().Tenant
  211. gm.TenantId = guest.GetDesc().TenantId
  212. gm.DomainId = guest.GetDesc().DomainId
  213. gm.ProjectDomain = guest.GetDesc().ProjectDomain
  214. gms[guestId] = gm
  215. }
  216. return true
  217. case compute.HYPERVISOR_POD:
  218. if podStats == nil {
  219. var err error
  220. csp := s.hostInfo.GetContainerStatsProvider()
  221. if csp == nil {
  222. log.Warningf("container stats provider is not ready")
  223. return true
  224. }
  225. podStats, err = csp.ListPodCPUAndMemoryStats()
  226. if err != nil {
  227. log.Errorf("ListPodCPUAndMemoryStats: %s", err)
  228. return true
  229. }
  230. if s.hostInfo.HasContainerNvidiaGpu() {
  231. nvidiaGpuMetrics, err = GetNvidiaGpuProcessMetrics()
  232. if err != nil {
  233. log.Errorf("GetNvidiaGpuProcessMetrics %s", err)
  234. }
  235. }
  236. if s.hostInfo.HasContainerVastaitechGpu() {
  237. vastaitechGpuMetrics, err = GetVastaitechGpuProcessMetrics()
  238. if err != nil {
  239. log.Errorf("GetVastaitechGpuProcessMetrics %s", err)
  240. }
  241. }
  242. if s.hostInfo.HasContainerCphAmdGpu() {
  243. cphAmdGpuMetrics, err = GetCphAmdGpuProcessMetrics()
  244. if err != nil {
  245. log.Errorf("GetCphAmdGpuProcessMetrics %s", err)
  246. }
  247. }
  248. }
  249. podStat, podProcs := GetPodStatsById(podStats, gpuPodProcs, guestId)
  250. if podStat != nil {
  251. gm, err := NewGuestPodMonitor(
  252. instance, guestName, guestId, podStat,
  253. nvidiaGpuMetrics, vastaitechGpuMetrics, cphAmdGpuMetrics,
  254. s.hostInfo, podProcs, nicsDesc, int(vcpuCount),
  255. )
  256. if err != nil {
  257. return true
  258. }
  259. gm.UpdateByInstance(instance)
  260. gms[guestId] = gm
  261. return true
  262. } else {
  263. delete(s.monitors, guestId)
  264. }
  265. }
  266. return true
  267. })
  268. s.monitors = gms
  269. return gms
  270. }
  271. func (s *SGuestMonitorCollector) CollectReportData(now, last time.Time) (ret string) {
  272. defer func() {
  273. if r := recover(); r != nil {
  274. log.Errorln(r)
  275. debug.PrintStack()
  276. }
  277. }()
  278. gms := s.GetGuests()
  279. s.cleanedPrevData(gms)
  280. reportData := make(map[string]*GuestMetrics)
  281. for _, gm := range gms {
  282. prevUsage := s.prevReportData[gm.Id]
  283. reportData[gm.Id] = s.collectGmReport(gm, prevUsage)
  284. s.prevPids[gm.Id] = gm.Pid
  285. }
  286. s.saveNicTraffics(reportData, gms, now, last)
  287. s.prevReportData = reportData
  288. ret = s.toTelegrafReportData(reportData)
  289. return
  290. }
  291. func (s *SGuestMonitorCollector) saveNicTraffics(reportData map[string]*GuestMetrics, gms map[string]*SGuestMonitor, now, last time.Time) {
  292. guestman.GetGuestManager().TrafficLock.Lock()
  293. defer guestman.GetGuestManager().TrafficLock.Unlock()
  294. isReset := now.Day() != last.Day() // across day
  295. var guestNicsTraffics = compute.NewGuestNicTrafficSyncInput(now, isReset)
  296. for guestId, data := range reportData {
  297. gm := gms[guestId]
  298. guestTrafficRecord, err := guestman.GetGuestManager().GetGuestTrafficRecord(gm.Id)
  299. if err != nil {
  300. log.Errorf("failed get guest traffic record %s", err)
  301. continue
  302. }
  303. guestTrafficsToSend := make(map[string]*compute.SNicTrafficRecord)
  304. guestTrafficsToSave := make(map[string]*compute.SNicTrafficRecord)
  305. for i := range gm.Nics {
  306. if gm.Nics[i].ChargeType != billing_api.NET_CHARGE_TYPE_BY_TRAFFIC {
  307. continue
  308. }
  309. var nicIo *NetIOMetric
  310. for j := range data.VmNetio {
  311. if gm.Nics[i].Mac == data.VmNetio[j].Meta.Mac {
  312. nicIo = data.VmNetio[j]
  313. break
  314. }
  315. }
  316. if nicIo == nil {
  317. log.Warningf("failed found report data for nic %s", gm.Nics[i].Ifname)
  318. continue
  319. }
  320. nicHasBeenSetDown := false
  321. nicTraffic := compute.SNicTrafficRecord{}
  322. for mac, record := range guestTrafficRecord {
  323. if mac == nicIo.Meta.Mac {
  324. nicTraffic.RxTraffic += record.RxTraffic
  325. nicTraffic.TxTraffic += record.TxTraffic
  326. nicHasBeenSetDown = record.HasBeenSetDown
  327. }
  328. }
  329. var nicDown = false
  330. nicTraffic.RxTraffic += int64(nicIo.TimeDiff * nicIo.BPSRecv / 8)
  331. if gm.Nics[i].RxTrafficLimit > 0 && nicTraffic.RxTraffic >= gm.Nics[i].RxTrafficLimit {
  332. // nic down
  333. nicDown = true
  334. }
  335. nicTraffic.TxTraffic += int64(nicIo.TimeDiff * nicIo.BPSSent / 8)
  336. if gm.Nics[i].TxTrafficLimit > 0 && nicTraffic.TxTraffic >= gm.Nics[i].TxTrafficLimit {
  337. // nic down
  338. nicDown = true
  339. }
  340. if !nicHasBeenSetDown && nicDown {
  341. log.Infof("guest %s nic %d traffic exceed tx: %d, tx_limit: %d, rx: %d, rx_limit: %d, set nic down", gm.Id, nicIo.Meta.Index, nicTraffic.TxTraffic, gm.Nics[i].TxTrafficLimit, nicTraffic.RxTraffic, gm.Nics[i].RxTrafficLimit)
  342. gm.SetNicDown(gm.Nics[i].Mac)
  343. nicTraffic.HasBeenSetDown = true
  344. }
  345. guestTrafficsToSend[nicIo.Meta.Mac] = &nicTraffic
  346. if gm.Nics[i].BillingType == billing_api.BILLING_TYPE_PREPAID || !isReset {
  347. guestTrafficsToSave[nicIo.Meta.Mac] = &nicTraffic
  348. }
  349. }
  350. if len(guestTrafficsToSend) > 0 {
  351. guestNicsTraffics.Traffic[gm.Id] = guestTrafficsToSend
  352. }
  353. if len(guestTrafficsToSave) > 0 {
  354. if err = guestman.GetGuestManager().SaveGuestTrafficRecord(gm.Id, guestTrafficsToSave); err != nil {
  355. log.Errorf("failed save guest %s traffic record %v", gm.Id, guestTrafficsToSave)
  356. continue
  357. }
  358. }
  359. }
  360. if len(guestNicsTraffics.Traffic) > 0 {
  361. guestman.SyncGuestNicsTraffics(guestNicsTraffics)
  362. }
  363. }
  364. func (s *SGuestMonitorCollector) toTelegrafReportData(data map[string]*GuestMetrics) string {
  365. ret := []string{}
  366. for guestId, report := range data {
  367. var vmName, vmIp, vmIp6, scalingGroupId, tenant, tenantId, domainId, projectDomain, hypervisor string
  368. if gm, ok := s.monitors[guestId]; ok {
  369. vmName = gm.Name
  370. vmIp = gm.Ip
  371. vmIp6 = gm.Ip6
  372. scalingGroupId = gm.ScalingGroupId
  373. tenant = gm.Tenant
  374. tenantId = gm.TenantId
  375. domainId = gm.DomainId
  376. projectDomain = gm.ProjectDomain
  377. hypervisor = gm.Hypervisor
  378. }
  379. tags := map[string]string{
  380. "id": guestId, "vm_id": guestId, "vm_name": vmName, "hypervisor": hypervisor,
  381. "is_vm": "true", hostconsts.TELEGRAF_TAG_KEY_BRAND: hostconsts.TELEGRAF_TAG_ONECLOUD_BRAND,
  382. hostconsts.TELEGRAF_TAG_KEY_RES_TYPE: "guest",
  383. }
  384. if len(vmIp) > 0 {
  385. tags["vm_ip"] = vmIp
  386. }
  387. if len(vmIp6) > 0 {
  388. tags["vm_ip6"] = vmIp6
  389. }
  390. if len(scalingGroupId) > 0 {
  391. tags["vm_scaling_group_id"] = scalingGroupId
  392. }
  393. if len(tenant) > 0 {
  394. tags["tenant"] = tenant
  395. }
  396. if len(tenantId) > 0 {
  397. tags["tenant_id"] = tenantId
  398. }
  399. if len(domainId) > 0 {
  400. tags["domain_id"] = domainId
  401. }
  402. if len(projectDomain) > 0 {
  403. tags["project_domain"] = projectDomain
  404. }
  405. ret = append(ret, report.toTelegrafData(tags)...)
  406. }
  407. return strings.Join(ret, "\n")
  408. }
  409. func (s *SGuestMonitorCollector) cleanedPrevData(gms map[string]*SGuestMonitor) {
  410. for guestId := range s.prevReportData {
  411. if gm, ok := gms[guestId]; !ok {
  412. delete(s.prevReportData, guestId)
  413. delete(s.prevPids, guestId)
  414. } else {
  415. if s.prevPids[guestId] != gm.Pid {
  416. delete(s.prevReportData, guestId)
  417. delete(s.prevPids, guestId)
  418. }
  419. }
  420. }
  421. }
  422. type GuestMetrics struct {
  423. VmCpu *CpuMetric `json:"vm_cpu"`
  424. VmMem *MemMetric `json:"vm_mem"`
  425. VmNetio []*NetIOMetric `json:"vm_netio"`
  426. VmDiskio *DiskIOMetric `json:"vm_diskio"`
  427. PodMetrics *PodMetrics `json:"pod_metrics"`
  428. }
  429. func (d *GuestMetrics) mapToStatStr(m map[string]interface{}) string {
  430. var statArr = []string{}
  431. for k, v := range m {
  432. if vs, ok := v.(string); ok && len(vs) == 0 {
  433. continue
  434. }
  435. statArr = append(statArr, fmt.Sprintf("%s=%v", k, v))
  436. }
  437. return strings.Join(statArr, ",")
  438. }
  439. func (d *GuestMetrics) netioToTelegrafData(measurement string, tagStr string) []string {
  440. res := []string{}
  441. for i := range d.VmNetio {
  442. netTagMap := d.VmNetio[i].ToTag()
  443. for k, v := range netTagMap {
  444. if len(k) == 0 || len(v) == 0 {
  445. continue
  446. }
  447. tagStr = fmt.Sprintf("%s,%s=%s", tagStr, k, v)
  448. }
  449. res = append(res, fmt.Sprintf("%s,%s %s", measurement, tagStr, d.mapToStatStr(d.VmNetio[i].ToMap())))
  450. }
  451. return res
  452. }
  453. func (d *GuestMetrics) toVmTelegrafData(tagStr string) []string {
  454. var res = []string{}
  455. res = append(res, fmt.Sprintf("%s,%s %s", "vm_cpu", tagStr, d.mapToStatStr(d.VmCpu.ToMap())))
  456. res = append(res, fmt.Sprintf("%s,%s %s", "vm_mem", tagStr, d.mapToStatStr(d.VmMem.ToMap())))
  457. res = append(res, fmt.Sprintf("%s,%s %s", "vm_diskio", tagStr, d.mapToStatStr(d.VmDiskio.ToMap())))
  458. res = append(res, d.netioToTelegrafData("vm_netio", tagStr)...)
  459. return res
  460. }
  461. func (d *GuestMetrics) toTelegrafData(tags map[string]string) []string {
  462. var tagArr = []string{}
  463. for k, v := range tags {
  464. tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, strings.ReplaceAll(v, " ", "+")))
  465. }
  466. tagStr := strings.Join(tagArr, ",")
  467. if d.PodMetrics == nil {
  468. return d.toVmTelegrafData(tagStr)
  469. } else {
  470. return d.toPodTelegrafData(tagStr)
  471. }
  472. }
  473. func (s *SGuestMonitorCollector) collectGmReport(
  474. gm *SGuestMonitor, prevUsage *GuestMetrics,
  475. ) *GuestMetrics {
  476. if prevUsage == nil {
  477. prevUsage = new(GuestMetrics)
  478. }
  479. if !gm.HasPodMetrics() {
  480. return s.collectGuestMetrics(gm, prevUsage)
  481. } else {
  482. if isPodContainerStopped(prevUsage, gm.podStat) {
  483. log.Infof("pod %s(%s) has container(s) stopped, clear previous usage", gm.Name, gm.Id)
  484. prevUsage = new(GuestMetrics)
  485. }
  486. return s.collectPodMetrics(gm, prevUsage)
  487. }
  488. }
  489. func (s *SGuestMonitorCollector) collectGuestMetrics(gm *SGuestMonitor, prevUsage *GuestMetrics) *GuestMetrics {
  490. gmData := new(GuestMetrics)
  491. gmData.VmCpu = gm.Cpu()
  492. gmData.VmMem = gm.Mem()
  493. gmData.VmDiskio = gm.Diskio()
  494. gmData.VmNetio = gm.Netio()
  495. netio1 := gmData.VmNetio
  496. netio2 := prevUsage.VmNetio
  497. s.addNetio(netio1, netio2)
  498. diskio1 := gmData.VmDiskio
  499. diskio2 := prevUsage.VmDiskio
  500. s.addDiskio(diskio1, diskio2)
  501. return gmData
  502. }
  503. func (s *SGuestMonitorCollector) addDiskio(curInfo, prevInfo *DiskIOMetric) {
  504. if prevInfo != nil {
  505. s.reportDiskIo(curInfo, prevInfo)
  506. }
  507. }
  508. func (s *SGuestMonitorCollector) reportDiskIo(cur, prev *DiskIOMetric) {
  509. timeCur := cur.Meta.Uptime
  510. timeOld := prev.Meta.Uptime
  511. diffTime := float64(timeCur - timeOld)
  512. if diffTime > 0 {
  513. cur.ReadBPS = float64((cur.ReadBytes-prev.ReadBytes)*8) / diffTime
  514. cur.WriteBPS = float64((cur.WriteBytes-prev.WriteBytes)*8) / diffTime
  515. cur.ReadBps = float64(cur.ReadBytes-prev.ReadBytes) / diffTime
  516. cur.WriteBps = float64(cur.WriteBytes-prev.WriteBytes) / diffTime
  517. cur.ReadIOPS = float64(cur.ReadCount-prev.ReadCount) / diffTime
  518. cur.WriteIOPS = float64(cur.WriteCount-prev.WriteCount) / diffTime
  519. }
  520. }
  521. func (s *SGuestMonitorCollector) addNetio(curInfo, prevInfo []*NetIOMetric) {
  522. for _, v1 := range curInfo {
  523. for _, v2 := range prevInfo {
  524. if v1.Meta.Mac == v2.Meta.Mac {
  525. s.reportNetIo(v1, v2)
  526. }
  527. }
  528. }
  529. }
  530. func (s *SGuestMonitorCollector) reportNetIo(cur, prev *NetIOMetric) {
  531. timeCur := cur.Meta.Uptime
  532. timeOld := prev.Meta.Uptime
  533. diffTime := float64(timeCur - timeOld)
  534. cur.TimeDiff = diffTime
  535. if diffTime > 0 {
  536. if cur.BytesSent < prev.BytesSent {
  537. cur.BPSSent = float64(cur.BytesSent*8) / diffTime
  538. } else {
  539. cur.BPSSent = float64((cur.BytesSent-prev.BytesSent)*8) / diffTime
  540. }
  541. if cur.BytesRecv < prev.BytesRecv {
  542. cur.BPSRecv = float64(cur.BytesRecv*8) / diffTime
  543. } else {
  544. cur.BPSRecv = float64((cur.BytesRecv-prev.BytesRecv)*8) / diffTime
  545. }
  546. if cur.PacketsSent < prev.PacketsSent {
  547. cur.PPSSent = float64(cur.PacketsSent) / diffTime
  548. } else {
  549. cur.PPSSent = float64(cur.PacketsSent-prev.PacketsSent) / diffTime
  550. }
  551. if cur.PacketsRecv < prev.PacketsRecv {
  552. cur.PPSRecv = float64(cur.PacketsRecv) / diffTime
  553. } else {
  554. cur.PPSRecv = float64(cur.PacketsRecv-prev.PacketsRecv) / diffTime
  555. }
  556. }
  557. }
  558. type SGuestMonitor struct {
  559. Name string
  560. Id string
  561. Pid int
  562. Nics []*desc.SGuestNetwork
  563. CpuCnt int
  564. MemMB int64
  565. Ip string
  566. Ip6 string
  567. Process *process.Process
  568. ScalingGroupId string
  569. Tenant string
  570. TenantId string
  571. DomainId string
  572. ProjectDomain string
  573. podStat *stats.PodStats
  574. nvidiaGpuMetrics []NvidiaGpuProcessMetrics
  575. nvidiaGpuIndexMemoryMap map[string]int
  576. vastaitechGpuMetrics []VastaitechGpuProcessMetrics
  577. cphAmdGpuMetrics []CphAmdGpuProcessMetrics
  578. instance guestman.GuestRuntimeInstance
  579. sysFs sysfs.SysFs
  580. Hypervisor string `json:"hypervisor"`
  581. }
  582. func NewGuestMonitor(instance guestman.GuestRuntimeInstance, name, id string, pid int, nics []*desc.SGuestNetwork, cpuCount int) (*SGuestMonitor, error) {
  583. proc, err := process.NewProcess(int32(pid))
  584. if err != nil {
  585. return nil, err
  586. }
  587. return newGuestMonitor(instance, name, id, proc, nics, cpuCount)
  588. }
  589. func NewGuestPodMonitor(
  590. instance guestman.GuestRuntimeInstance, name, id string, stat *stats.PodStats,
  591. nvidiaGpuMetrics []NvidiaGpuProcessMetrics, vastaitechGpuMetrics []VastaitechGpuProcessMetrics, cphAmdGpuMetrics []CphAmdGpuProcessMetrics,
  592. hostInstance IHostInfo, podProcs map[string]struct{}, nics []*desc.SGuestNetwork, cpuCount int,
  593. ) (*SGuestMonitor, error) {
  594. m, err := newGuestMonitor(instance, name, id, nil, nics, cpuCount)
  595. if err != nil {
  596. return nil, errors.Wrap(err, "new pod GuestMonitor")
  597. }
  598. m.podStat = stat
  599. podDesc := instance.GetDesc()
  600. hasNvGpu := false
  601. hasCphAmdGpu := false
  602. hasVastaitechGpu := false
  603. for i := range podDesc.IsolatedDevices {
  604. if utils.IsInStringArray(podDesc.IsolatedDevices[i].DevType, compute.NVIDIA_GPU_TYPES) {
  605. hasNvGpu = true
  606. } else if podDesc.IsolatedDevices[i].DevType == compute.CONTAINER_DEV_VASTAITECH_GPU {
  607. hasVastaitechGpu = true
  608. } else if podDesc.IsolatedDevices[i].DevType == compute.CONTAINER_DEV_CPH_AMD_GPU {
  609. hasCphAmdGpu = true
  610. }
  611. }
  612. if hasNvGpu {
  613. m.nvidiaGpuMetrics = GetPodNvidiaGpuMetrics(nvidiaGpuMetrics, podProcs)
  614. m.nvidiaGpuIndexMemoryMap = hostInstance.GetNvidiaGpuIndexMemoryMap()
  615. }
  616. if hasVastaitechGpu {
  617. m.vastaitechGpuMetrics = GetPodVastaitechGpuMetrics(vastaitechGpuMetrics, podProcs)
  618. }
  619. if hasCphAmdGpu {
  620. m.cphAmdGpuMetrics = GetPodCphAmdGpuMetrics(cphAmdGpuMetrics, podProcs)
  621. }
  622. return m, nil
  623. }
  624. func newGuestMonitor(instance guestman.GuestRuntimeInstance, name, id string, proc *process.Process, nics []*desc.SGuestNetwork, cpuCount int) (*SGuestMonitor, error) {
  625. var ip, ip6 string
  626. if len(nics) >= 1 {
  627. for i := range nics {
  628. if len(ip) == 0 && len(nics[i].Ip) > 0 {
  629. ip = nics[i].Ip
  630. }
  631. if len(ip6) == 0 && len(nics[i].Ip6) > 0 {
  632. ip6 = nics[i].Ip6
  633. }
  634. }
  635. }
  636. pid := 0
  637. if proc != nil {
  638. pid = int(proc.Pid)
  639. }
  640. return &SGuestMonitor{
  641. Name: name,
  642. Id: id,
  643. Pid: pid,
  644. Nics: nics,
  645. CpuCnt: cpuCount,
  646. Ip: ip,
  647. Ip6: ip6,
  648. Process: proc,
  649. instance: instance,
  650. sysFs: sysfs.NewRealSysFs(),
  651. Hypervisor: instance.GetDesc().Hypervisor,
  652. }, nil
  653. }
  654. func (m *SGuestMonitor) UpdateByInstance(instance guestman.GuestRuntimeInstance) {
  655. guestName := instance.GetDesc().Name
  656. nicsDesc := instance.GetDesc().Nics
  657. vcpuCount := instance.GetDesc().Cpu
  658. m.UpdateVmName(guestName)
  659. m.UpdateNicsDesc(nicsDesc)
  660. m.UpdateCpuCount(int(vcpuCount))
  661. m.MemMB = instance.GetDesc().Mem
  662. m.ScalingGroupId = instance.GetDesc().ScalingGroupId
  663. m.Tenant = instance.GetDesc().Tenant
  664. m.TenantId = instance.GetDesc().TenantId
  665. m.DomainId = instance.GetDesc().DomainId
  666. m.ProjectDomain = instance.GetDesc().ProjectDomain
  667. m.Hypervisor = instance.GetDesc().Hypervisor
  668. }
  669. func (m *SGuestMonitor) SetNicDown(mac string) {
  670. guest, ok := guestman.GetGuestManager().GetKVMServer(m.Id)
  671. if !ok {
  672. return
  673. }
  674. if err := guest.SetNicDown(mac); err != nil {
  675. log.Errorf("guest %s SetNicDown failed %s", m.Id, err)
  676. }
  677. }
  678. func (m *SGuestMonitor) UpdateVmName(name string) {
  679. m.Name = name
  680. }
  681. func (m *SGuestMonitor) UpdateNicsDesc(nics []*desc.SGuestNetwork) {
  682. m.Nics = nics
  683. }
  684. func (m *SGuestMonitor) UpdateCpuCount(vcpuCount int) {
  685. if vcpuCount < 1 {
  686. vcpuCount = 1
  687. }
  688. m.CpuCnt = vcpuCount
  689. }
  690. func (m *SGuestMonitor) GetSriovNicStats(pfName string, virtfn int) (*psnet.IOCountersStat, error) {
  691. statsPath := fmt.Sprintf("/sys/class/net/%s/device/sriov/%d/stats", pfName, virtfn)
  692. if !fileutils2.Exists(statsPath) {
  693. return nil, nil
  694. }
  695. stats, err := fileutils2.FileGetContents(statsPath)
  696. if err != nil {
  697. return nil, errors.Wrapf(err, "read file %s", statsPath)
  698. }
  699. res := new(psnet.IOCountersStat)
  700. statsStr := string(stats)
  701. for _, line := range strings.Split(statsStr, "\n") {
  702. segs := strings.Split(line, ":")
  703. if len(segs) != 2 {
  704. continue
  705. }
  706. val, err := strconv.ParseUint(strings.TrimSpace(segs[1]), 10, 64)
  707. if err != nil {
  708. log.Errorf("failed parse %s", line)
  709. continue
  710. }
  711. switch strings.TrimSpace(segs[0]) {
  712. case "tx_packets":
  713. res.PacketsSent = val
  714. case "tx_bytes":
  715. res.BytesSent = val
  716. case "tx_dropped":
  717. res.Dropout = val
  718. case "rx_packets":
  719. res.PacketsRecv = val
  720. case "rx_bytes":
  721. res.BytesRecv = val
  722. case "rx_dropped":
  723. res.Dropin = val
  724. }
  725. }
  726. return res, nil
  727. }
  728. func (m *SGuestMonitor) Netio() []*NetIOMetric {
  729. if len(m.Nics) == 0 {
  730. return nil
  731. }
  732. netstats, err := psnet.IOCounters(true)
  733. if err != nil {
  734. return nil
  735. }
  736. var res = []*NetIOMetric{}
  737. for i, nic := range m.Nics {
  738. var ifname = nic.Ifname
  739. var nicStat *psnet.IOCountersStat
  740. if nic.Driver == "vfio-pci" {
  741. if guest, ok := guestman.GetGuestManager().GetKVMServer(m.Id); ok {
  742. dev, err := guest.GetSriovDeviceByNetworkIndex(nic.Index)
  743. if err != nil {
  744. log.Errorf("failed get sriov deivce by network index %s", err)
  745. continue
  746. }
  747. nicStat, err = m.GetSriovNicStats(dev.GetPfName(), dev.GetVirtfn())
  748. if err != nil {
  749. log.Errorf("failed get sriov nic stats: %s", err)
  750. continue
  751. }
  752. } else {
  753. continue
  754. }
  755. } else {
  756. for j, netstat := range netstats {
  757. if netstat.Name == ifname {
  758. nicStat = &netstats[j]
  759. }
  760. }
  761. }
  762. if nicStat == nil {
  763. continue
  764. }
  765. data := new(NetIOMetric)
  766. ip := nic.Ip
  767. if len(ip) > 0 {
  768. ipv4, _ := netutils.NewIPV4Addr(ip)
  769. if netutils.IsExitAddress(ipv4) {
  770. data.Meta.IpType = "external"
  771. } else {
  772. data.Meta.IpType = "internal"
  773. }
  774. } else {
  775. data.Meta.IpType = "none"
  776. }
  777. data.Meta.Ip = ip
  778. data.Meta.Ip6 = nic.Ip6
  779. data.Meta.Index = i
  780. data.Meta.Mac = nic.Mac
  781. data.Meta.Ifname = ifname
  782. data.Meta.NetId = nic.NetId
  783. data.Meta.Uptime, _ = host.Uptime()
  784. if nic.Driver == "vfio-pci" {
  785. data.BytesSent = nicStat.BytesSent
  786. data.BytesRecv = nicStat.BytesRecv
  787. data.PacketsRecv = nicStat.PacketsRecv
  788. data.PacketsSent = nicStat.PacketsSent
  789. data.ErrIn = nicStat.Errin
  790. data.ErrOut = nicStat.Errout
  791. data.DropIn = nicStat.Dropin
  792. data.DropOut = nicStat.Dropout
  793. } else {
  794. data.BytesSent = nicStat.BytesRecv
  795. data.BytesRecv = nicStat.BytesSent
  796. data.PacketsRecv = nicStat.PacketsSent
  797. data.PacketsSent = nicStat.PacketsRecv
  798. data.ErrIn = nicStat.Errout
  799. data.ErrOut = nicStat.Errin
  800. data.DropIn = nicStat.Dropout
  801. data.DropOut = nicStat.Dropin
  802. }
  803. res = append(res, data)
  804. }
  805. return res
  806. }
  807. type NetIOMetric struct {
  808. Meta NetMeta `json:"-"`
  809. BytesSent uint64 `json:"bytes_sent"`
  810. BytesRecv uint64 `json:"bytes_recv"`
  811. PacketsSent uint64 `json:"packets_sent"`
  812. PacketsRecv uint64 `json:"packets_recv"`
  813. ErrIn uint64 `json:"err_in"`
  814. ErrOut uint64 `json:"err_out"`
  815. DropIn uint64 `json:"drop_in"`
  816. DropOut uint64 `json:"drop_out"`
  817. // calculated on guest metrics report
  818. BPSRecv float64 `json:"bps_recv"`
  819. BPSSent float64 `json:"bps_sent"`
  820. PPSRecv float64 `json:"pps_recv"`
  821. PPSSent float64 `json:"pps_sent"`
  822. TimeDiff float64 `json:"-"`
  823. }
  824. func (n *NetIOMetric) ToMap() map[string]interface{} {
  825. return map[string]interface{}{
  826. "bytes_sent": n.BytesSent,
  827. "bytes_recv": n.BytesRecv,
  828. "packets_sent": n.PacketsSent,
  829. "packets_recv": n.PacketsRecv,
  830. "err_in": n.ErrIn,
  831. "err_out": n.ErrOut,
  832. "drop_in": n.DropIn,
  833. "drop_out": n.DropOut,
  834. "bps_recv": n.BPSRecv,
  835. "bps_sent": n.BPSSent,
  836. "pps_recv": n.PPSRecv,
  837. "pps_sent": n.PPSSent,
  838. }
  839. }
  840. func (n *NetIOMetric) ToTag() map[string]string {
  841. tags := map[string]string{
  842. "interface": fmt.Sprintf("eth%d", n.Meta.Index),
  843. "host_interface": n.Meta.Ifname,
  844. "mac": n.Meta.Mac,
  845. "ip_type": n.Meta.IpType,
  846. }
  847. if len(n.Meta.Ip) > 0 {
  848. tags["ip"] = n.Meta.Ip
  849. }
  850. if len(n.Meta.Ip6) > 0 {
  851. tags["ip6"] = n.Meta.Ip6
  852. }
  853. return tags
  854. }
  855. type NetMeta struct {
  856. IpType string `json:"ip_type"`
  857. Ip string `json:"ip"`
  858. Mac string `json:"mac"`
  859. Index int `json:"index"`
  860. Ifname string `json:"ifname"`
  861. NetId string `json:"net_id"`
  862. Uptime uint64 `json:"uptime"`
  863. Ip6 string `json:"ip6"`
  864. }
  865. func (m *SGuestMonitor) Cpu() *CpuMetric {
  866. percent, _ := m.Process.Percent(time.Millisecond * 100)
  867. cpuTimes, _ := m.Process.Times()
  868. percent, _ = strconv.ParseFloat(fmt.Sprintf("%0.4f", percent/float64(m.CpuCnt)), 64)
  869. threadCnt, _ := m.Process.NumThreads()
  870. return &CpuMetric{
  871. UsageActive: percent,
  872. CpuUsageIdlePcore: float64(100 - percent/float64(m.CpuCnt)),
  873. CpuUsagePcore: float64(percent / float64(m.CpuCnt)),
  874. CpuTimeSystem: cpuTimes.System,
  875. CpuTimeUser: cpuTimes.User,
  876. CpuCount: m.CpuCnt,
  877. ThreadCount: threadCnt,
  878. }
  879. }
  880. type CpuMetric struct {
  881. UsageActive float64 `json:"usage_active"`
  882. CpuUsageIdlePcore float64 `json:"cpu_usage_idle_pcore"`
  883. CpuUsagePcore float64 `json:"cpu_usage_pcore"`
  884. CpuTimeUser float64 `json:"cpu_time_user"`
  885. CpuTimeSystem float64 `json:"cpu_time_system"`
  886. CpuCount int `json:"cpu_count"`
  887. ThreadCount int32 `json:"thread_count"`
  888. }
  889. func (c *CpuMetric) ToMap() map[string]interface{} {
  890. return map[string]interface{}{
  891. "usage_active": c.UsageActive,
  892. "cpu_usage_idle_pcore": c.CpuUsageIdlePcore,
  893. "cpu_usage_pcore": c.CpuUsagePcore,
  894. "cpu_time_user": c.CpuTimeUser,
  895. "cpu_time_system": c.CpuTimeSystem,
  896. "cpu_count": c.CpuCount,
  897. "thread_count": c.ThreadCount,
  898. }
  899. }
  900. func (m *SGuestMonitor) Diskio() *DiskIOMetric {
  901. io, err := m.Process.IOCounters()
  902. if err != nil {
  903. log.Errorln(err)
  904. return nil
  905. }
  906. ret := new(DiskIOMetric)
  907. ret.Meta.Uptime, _ = host.Uptime()
  908. ret.ReadCount = io.ReadCount
  909. ret.ReadBytes = io.ReadBytes
  910. ret.WriteBytes = io.WriteBytes
  911. ret.WriteCount = io.WriteCount
  912. return ret
  913. }
  914. type DiskIOMetric struct {
  915. Meta DiskIOMeta `json:"-"`
  916. ReadBytes uint64 `json:"read_bytes"`
  917. WriteBytes uint64 `json:"write_bytes"`
  918. ReadCount uint64 `json:"read_count"`
  919. WriteCount uint64 `json:"write_count"`
  920. // calculated on guest metrics report
  921. ReadBps float64 `json:"read_Bps"`
  922. WriteBps float64 `json:"write_Bps"`
  923. ReadBPS float64 `json:"read_bps"`
  924. WriteBPS float64 `json:"write_bps"`
  925. ReadIOPS float64 `json:"read_iops"`
  926. WriteIOPS float64 `json:"write_iops"`
  927. }
  928. func (d *DiskIOMetric) ToMap() map[string]interface{} {
  929. return map[string]interface{}{
  930. "read_bytes": d.ReadBytes,
  931. "write_bytes": d.WriteBytes,
  932. "read_count": d.ReadCount,
  933. "write_count": d.WriteCount,
  934. "read_Bps": d.ReadBps,
  935. "write_Bps": d.WriteBps,
  936. "read_bps": d.ReadBPS,
  937. "write_bps": d.WriteBPS,
  938. "read_iops": d.ReadIOPS,
  939. "write_iops": d.WriteIOPS,
  940. }
  941. }
  942. type DiskIOMeta struct {
  943. Uptime uint64 `json:"uptime"`
  944. }
  945. func (m *SGuestMonitor) Mem() *MemMetric {
  946. mem, err := m.Process.MemoryInfo()
  947. usedPercent, _ := m.Process.MemoryPercent()
  948. if err != nil {
  949. log.Errorln(err)
  950. return nil
  951. }
  952. ret := new(MemMetric)
  953. ret.RSS = mem.RSS
  954. ret.VMS = mem.VMS
  955. ret.UsedPercent = usedPercent
  956. return ret
  957. }
  958. type MemMetric struct {
  959. RSS uint64 `json:"rss"`
  960. VMS uint64 `json:"vms"`
  961. UsedPercent float32 `json:"used_percent"`
  962. }
  963. func (m *MemMetric) ToMap() map[string]interface{} {
  964. return map[string]interface{}{
  965. "rss": m.RSS,
  966. "vms": m.VMS,
  967. "used_percent": m.UsedPercent,
  968. }
  969. }