system.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package misc
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "regexp"
  22. "strings"
  23. "time"
  24. "yunion.io/x/jsonutils"
  25. "yunion.io/x/log"
  26. "yunion.io/x/pkg/errors"
  27. "yunion.io/x/pkg/gotypes"
  28. "yunion.io/x/pkg/util/httputils"
  29. "yunion.io/x/pkg/utils"
  30. "yunion.io/x/onecloud/pkg/apis"
  31. compute_api "yunion.io/x/onecloud/pkg/apis/compute"
  32. api "yunion.io/x/onecloud/pkg/apis/identity"
  33. "yunion.io/x/onecloud/pkg/cloudcommon/tsdb"
  34. "yunion.io/x/onecloud/pkg/cloudmon/options"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. "yunion.io/x/onecloud/pkg/mcclient/auth"
  37. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  38. "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
  39. baseoptions "yunion.io/x/onecloud/pkg/mcclient/options"
  40. "yunion.io/x/onecloud/pkg/util/influxdb"
  41. )
  42. const (
  43. SYSTEM_METRIC_DATABASE = "system"
  44. METIRCY_TYPE_HTTP_REQUST = "http_request"
  45. METIRCY_TYPE_WORKER = "worker"
  46. METIRCY_TYPE_DB_STATS = "db_stats"
  47. METIRCY_TYPE_PROCESS = "process"
  48. )
  49. func getEndpoints(s *mcclient.ClientSession) ([]api.EndpointDetails, error) {
  50. ret := make([]api.EndpointDetails, 0)
  51. params := baseoptions.BaseListOptions{}
  52. limit := 1024
  53. params.Limit = &limit
  54. boolTrue := true
  55. params.Details = &boolTrue
  56. params.Scope = "system"
  57. params.Filter = []string{
  58. "interface.equals(internal)",
  59. "enabled.equals(1)",
  60. }
  61. for {
  62. offset := len(ret)
  63. params.Offset = &offset
  64. resp, err := identity.EndpointsV3.List(s, jsonutils.Marshal(params))
  65. if err != nil {
  66. return nil, errors.Wrapf(err, "Endpoints.List")
  67. }
  68. for i := range resp.Data {
  69. endpoint := api.EndpointDetails{}
  70. err := resp.Data[i].Unmarshal(&endpoint)
  71. if err != nil {
  72. return nil, errors.Wrapf(err, "Unmarshal")
  73. }
  74. ret = append(ret, endpoint)
  75. }
  76. if len(ret) >= resp.Total {
  77. break
  78. }
  79. }
  80. return ret, nil
  81. }
  82. func getHosts(s *mcclient.ClientSession) ([]compute_api.HostDetails, error) {
  83. params := compute_api.HostListInput{}
  84. boolFalse := false
  85. limit := 100
  86. params.Limit = &limit
  87. params.Brand = []string{compute_api.CLOUD_PROVIDER_ONECLOUD}
  88. params.Scope = "system"
  89. params.Status = []string{compute_api.HOST_STATUS_RUNNING}
  90. params.HostStatus = []string{compute_api.HOST_ONLINE}
  91. params.Details = &boolFalse
  92. hosts := []compute_api.HostDetails{}
  93. for {
  94. offset := len(hosts)
  95. params.Offset = &offset
  96. resp, err := compute.Hosts.List(s, jsonutils.Marshal(params))
  97. if err != nil {
  98. return nil, errors.Wrapf(err, "Hosts.List")
  99. }
  100. for i := range resp.Data {
  101. host := compute_api.HostDetails{}
  102. err := resp.Data[i].Unmarshal(&host)
  103. if err != nil {
  104. return nil, errors.Wrapf(err, "Unmarshal")
  105. }
  106. hosts = append(hosts, host)
  107. }
  108. if len(hosts) >= resp.Total {
  109. break
  110. }
  111. }
  112. return hosts, nil
  113. }
  114. func CollectServiceMetrics(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  115. if options.Options.DisableServiceMetric {
  116. return
  117. }
  118. s := auth.GetAdminSession(ctx, options.Options.CommonOptions.Region)
  119. urls, err := tsdb.GetDefaultServiceSourceURLs(s, options.Options.SessionEndpointType)
  120. if err != nil {
  121. return
  122. }
  123. tk := auth.AdminCredential().GetTokenString()
  124. err = func() error {
  125. endpoints, err := getEndpoints(s)
  126. if err != nil {
  127. return errors.Wrapf(err, "getEndpoints")
  128. }
  129. metrics := []influxdb.SMetricData{}
  130. for _, ep := range endpoints {
  131. if utils.IsInStringArray(ep.ServiceType, apis.EXTERNAL_SERVICES) {
  132. continue
  133. }
  134. url := httputils.JoinPath(ep.Url, "version")
  135. hdr := http.Header{}
  136. hdr.Set("X-Auth-Token", tk)
  137. resp, err := httputils.Request(
  138. httputils.GetDefaultClient(),
  139. ctx, "GET",
  140. url, hdr, nil, false,
  141. )
  142. if err != nil {
  143. continue
  144. }
  145. defer resp.Body.Close()
  146. version, _ := io.ReadAll(resp.Body)
  147. if len(version) == 0 {
  148. continue
  149. }
  150. part, err := collectServiceMetrics(ctx, ep, string(version), tk)
  151. if err != nil {
  152. log.Errorf("collect service %s metric error: %v", ep.ServiceType, err)
  153. }
  154. metrics = append(metrics, part...)
  155. }
  156. if len(metrics) > 0 {
  157. err := influxdb.SendMetrics(urls, SYSTEM_METRIC_DATABASE, metrics, false)
  158. if err != nil {
  159. return errors.Wrapf(err, "SendMetrics")
  160. }
  161. }
  162. return nil
  163. }()
  164. if err != nil {
  165. log.Errorf("collect service metric error: %v", err)
  166. }
  167. {
  168. hosts, err := getHosts(s)
  169. if err != nil {
  170. log.Errorf("get hosts error: %v", err)
  171. }
  172. metrics := []influxdb.SMetricData{}
  173. for _, host := range hosts {
  174. part := collectHostMetrics(ctx, host, tk)
  175. metrics = append(metrics, part...)
  176. }
  177. if len(metrics) > 0 {
  178. err := influxdb.SendMetrics(urls, SYSTEM_METRIC_DATABASE, metrics, false)
  179. if err != nil {
  180. log.Errorf("send host metrics error: %v", err)
  181. }
  182. }
  183. }
  184. }
  185. func collectApiStatsMetrics(ctx context.Context, serviceName string, serviceType string, regionId string, url string, version, token string) ([]influxdb.SMetricData, error) {
  186. if len(url) == 0 || utils.IsInStringArray(serviceType, []string{"mcp-server"}) {
  187. return []influxdb.SMetricData{}, nil
  188. }
  189. log.Debugf("collectApiStatsMetrics %s %s %s %s %s", serviceName, serviceType, regionId, url, version)
  190. statsUrl := httputils.JoinPath(baseUrlF(url), "stats")
  191. hdr := http.Header{}
  192. hdr.Set("X-Auth-Token", token)
  193. _, ret, err := httputils.JSONRequest(
  194. httputils.GetDefaultClient(),
  195. ctx, "GET",
  196. statsUrl, hdr, nil, false,
  197. )
  198. if err != nil {
  199. return nil, errors.Wrapf(err, "request")
  200. }
  201. if gotypes.IsNil(ret) {
  202. return []influxdb.SMetricData{}, nil
  203. }
  204. stats := sApiHttpStats{}
  205. err = ret.Unmarshal(&stats)
  206. if err != nil {
  207. return nil, errors.Wrapf(err, "Unmarshal")
  208. }
  209. metrics := updateHttpStatsSnapshot(serviceName, url, time.Now(), stats, serviceType, regionId, version)
  210. return metrics, nil
  211. }
  212. func collectWorkerMetrics(ctx context.Context, url, service, serviceType, regionId, version, token string) ([]influxdb.SMetricData, error) {
  213. if len(url) == 0 || utils.IsInStringArray(serviceType, []string{"mcp-server"}) {
  214. return []influxdb.SMetricData{}, nil
  215. }
  216. statsUrl := httputils.JoinPath(baseUrlF(url), "worker_stats")
  217. hdr := http.Header{}
  218. hdr.Set("X-Auth-Token", token)
  219. _, ret, err := httputils.JSONRequest(
  220. httputils.GetDefaultClient(),
  221. ctx, "GET",
  222. statsUrl, hdr, nil, false,
  223. )
  224. if err != nil {
  225. return nil, errors.Wrapf(err, "request")
  226. }
  227. if gotypes.IsNil(ret) {
  228. return []influxdb.SMetricData{}, nil
  229. }
  230. workers := []struct {
  231. ActiveWorkerCnt int `json:"active_worker_cnt"`
  232. AllowOverflow bool `json:"allow_overflow"`
  233. Backlog int `json:"backlog"`
  234. DbWorker bool `json:"db_worker"`
  235. DetachWorkerCnt int `json:"detach_worker_cnt"`
  236. MaxWorkerCnt int `json:"max_worker_cnt"`
  237. Name string `json:"name"`
  238. QueueCnt int `json:"queue_cnt"`
  239. }{}
  240. err = ret.Unmarshal(&workers, "workers")
  241. if err != nil {
  242. return nil, errors.Wrapf(err, "Unmarshal")
  243. }
  244. result := []influxdb.SMetricData{}
  245. for _, worker := range workers {
  246. metric := influxdb.SMetricData{
  247. Name: METIRCY_TYPE_WORKER,
  248. Timestamp: time.Now(),
  249. Tags: []influxdb.SKeyValue{
  250. {
  251. Key: "version",
  252. Value: version,
  253. },
  254. {
  255. Key: "service",
  256. Value: service,
  257. },
  258. {
  259. Key: "service_type",
  260. Value: serviceType,
  261. },
  262. {
  263. Key: "region",
  264. Value: regionId,
  265. },
  266. {
  267. Key: "worker_name",
  268. Value: worker.Name,
  269. },
  270. },
  271. Metrics: []influxdb.SKeyValue{
  272. {
  273. Key: "active_worker_cnt",
  274. Value: fmt.Sprintf("%d", worker.ActiveWorkerCnt),
  275. },
  276. {
  277. Key: "max_worker_cnt",
  278. Value: fmt.Sprintf("%d", worker.MaxWorkerCnt),
  279. },
  280. {
  281. Key: "detach_worker_cnt",
  282. Value: fmt.Sprintf("%d", worker.DetachWorkerCnt),
  283. },
  284. {
  285. Key: "queue_cnt",
  286. Value: fmt.Sprintf("%d", worker.QueueCnt),
  287. },
  288. {
  289. Key: "total_workload",
  290. Value: fmt.Sprintf("%d", worker.ActiveWorkerCnt+worker.QueueCnt+worker.DetachWorkerCnt),
  291. },
  292. {
  293. Key: "active_workload",
  294. Value: fmt.Sprintf("%d", worker.ActiveWorkerCnt+worker.DetachWorkerCnt),
  295. },
  296. },
  297. }
  298. result = append(result, metric)
  299. }
  300. return result, nil
  301. }
  302. func collectDatabaseMetrics(ctx context.Context, ep api.EndpointDetails, version, token string) ([]influxdb.SMetricData, error) {
  303. if utils.IsInStringArray(ep.ServiceType, []string{"cloudmon", "webconsole", "k8s", "vpcagent", "yunionapi", "yunionagent", "mcp-server"}) {
  304. return []influxdb.SMetricData{}, nil
  305. }
  306. statsUrl := httputils.JoinPath(baseUrlF(ep.Url), "db_stats")
  307. hdr := http.Header{}
  308. hdr.Set("X-Auth-Token", token)
  309. _, ret, err := httputils.JSONRequest(
  310. httputils.GetDefaultClient(),
  311. ctx, "GET",
  312. statsUrl, hdr, nil, false,
  313. )
  314. if err != nil {
  315. return nil, errors.Wrapf(err, "request")
  316. }
  317. if gotypes.IsNil(ret) {
  318. return []influxdb.SMetricData{}, nil
  319. }
  320. stats := struct {
  321. Idle int `json:"idle"`
  322. InUse int `json:"in_use"`
  323. MaxIdleClosed int `json:"max_idle_closed"`
  324. MaxIdleTimeClosed int `json:"max_idle_time_closed"`
  325. MaxLifetimeClosed int `json:"max_lifetime_closed"`
  326. MaxOpenConnections int `json:"max_open_connections"`
  327. OpenConnections int `json:"open_connections"`
  328. WaitCount int `json:"wait_count"`
  329. WaitDuration int `json:"wait_duration"`
  330. }{}
  331. err = ret.Unmarshal(&stats, "db_stats")
  332. if err != nil {
  333. return nil, errors.Wrapf(err, "Unmarshal")
  334. }
  335. metric := influxdb.SMetricData{
  336. Name: METIRCY_TYPE_DB_STATS,
  337. Timestamp: time.Now(),
  338. Tags: []influxdb.SKeyValue{
  339. {
  340. Key: "version",
  341. Value: version,
  342. },
  343. {
  344. Key: "service",
  345. Value: ep.ServiceName,
  346. },
  347. },
  348. Metrics: []influxdb.SKeyValue{
  349. {
  350. Key: "idle",
  351. Value: fmt.Sprintf("%d", stats.Idle),
  352. },
  353. {
  354. Key: "in_use",
  355. Value: fmt.Sprintf("%d", stats.InUse),
  356. },
  357. {
  358. Key: "max_idle_closed",
  359. Value: fmt.Sprintf("%d", stats.MaxIdleClosed),
  360. },
  361. {
  362. Key: "max_idle_time_closed",
  363. Value: fmt.Sprintf("%d", stats.MaxIdleTimeClosed),
  364. },
  365. {
  366. Key: "max_lifetime_closed",
  367. Value: fmt.Sprintf("%d", stats.MaxLifetimeClosed),
  368. },
  369. {
  370. Key: "max_open_connections",
  371. Value: fmt.Sprintf("%d", stats.MaxOpenConnections),
  372. },
  373. {
  374. Key: "open_connections",
  375. Value: fmt.Sprintf("%d", stats.OpenConnections),
  376. },
  377. {
  378. Key: "wait_count",
  379. Value: fmt.Sprintf("%d", stats.WaitCount),
  380. },
  381. {
  382. Key: "wait_duration",
  383. Value: fmt.Sprintf("%d", stats.WaitDuration),
  384. },
  385. },
  386. }
  387. return []influxdb.SMetricData{metric}, nil
  388. }
  389. func collectProcessMetrics(ctx context.Context, ep api.EndpointDetails, version, token string) ([]influxdb.SMetricData, error) {
  390. if utils.IsInStringArray(ep.ServiceType, []string{"monitor", "webconsole", "k8s", "mcp-server"}) {
  391. return []influxdb.SMetricData{}, nil
  392. }
  393. statsUrl := httputils.JoinPath(baseUrlF(ep.Url), "process_stats")
  394. hdr := http.Header{}
  395. hdr.Set("X-Auth-Token", token)
  396. _, ret, err := httputils.JSONRequest(
  397. httputils.GetDefaultClient(),
  398. ctx, "GET",
  399. statsUrl, hdr, nil, false,
  400. )
  401. if err != nil {
  402. return nil, errors.Wrapf(err, "request")
  403. }
  404. if gotypes.IsNil(ret) {
  405. return []influxdb.SMetricData{}, nil
  406. }
  407. process := apis.ProcessStats{}
  408. err = ret.Unmarshal(&process, "process_stats")
  409. if err != nil {
  410. return nil, errors.Wrapf(err, "Unmarshal")
  411. }
  412. metric := influxdb.SMetricData{
  413. Name: METIRCY_TYPE_PROCESS,
  414. Timestamp: time.Now(),
  415. Tags: []influxdb.SKeyValue{
  416. {
  417. Key: "version",
  418. Value: version,
  419. },
  420. {
  421. Key: "service",
  422. Value: ep.ServiceName,
  423. },
  424. },
  425. Metrics: []influxdb.SKeyValue{
  426. {
  427. Key: "cpu_percent",
  428. Value: fmt.Sprintf("%.2f", process.CpuPercent),
  429. },
  430. {
  431. Key: "mem_percent",
  432. Value: fmt.Sprintf("%.2f", process.MemPercent),
  433. },
  434. {
  435. Key: "mem_size",
  436. Value: fmt.Sprintf("%d", process.MemSize),
  437. },
  438. {
  439. Key: "goroutine_num",
  440. Value: fmt.Sprintf("%d", process.GoroutineNum),
  441. },
  442. },
  443. }
  444. return []influxdb.SMetricData{metric}, nil
  445. }
  446. func baseUrlF(baseurl string) string {
  447. obj, _ := url.Parse(baseurl)
  448. lastSlashPos := strings.LastIndex(obj.Path, "/")
  449. if lastSlashPos >= 0 {
  450. lastSeg := obj.Path[lastSlashPos+1:]
  451. verReg := regexp.MustCompile(`^v\d+`)
  452. if verReg.MatchString(lastSeg) {
  453. obj.Path = obj.Path[:lastSlashPos]
  454. }
  455. }
  456. ret := obj.String()
  457. return ret
  458. }
  459. func collectServiceMetrics(ctx context.Context, ep api.EndpointDetails, version, token string) ([]influxdb.SMetricData, error) {
  460. ret, errs := []influxdb.SMetricData{}, []error{}
  461. stats, err := collectApiStatsMetrics(ctx, ep.ServiceName, ep.ServiceType, ep.RegionId, ep.Url, version, token)
  462. if err != nil {
  463. errs = append(errs, err)
  464. }
  465. ret = append(ret, stats...)
  466. worker, err := collectWorkerMetrics(ctx, ep.Url, ep.ServiceName, ep.ServiceType, ep.RegionId, version, token)
  467. if err != nil {
  468. errs = append(errs, err)
  469. }
  470. ret = append(ret, worker...)
  471. db, err := collectDatabaseMetrics(ctx, ep, version, token)
  472. if err != nil {
  473. errs = append(errs, err)
  474. }
  475. ret = append(ret, db...)
  476. process, err := collectProcessMetrics(ctx, ep, version, token)
  477. if err != nil {
  478. errs = append(errs, err)
  479. }
  480. ret = append(ret, process...)
  481. return ret, errors.NewAggregate(errs)
  482. }
  483. func collectHostMetrics(ctx context.Context, host compute_api.HostDetails, token string) []influxdb.SMetricData {
  484. metrics := []influxdb.SMetricData{}
  485. service := fmt.Sprintf("host-%s", host.Name)
  486. part, err := collectWorkerMetrics(ctx, host.ManagerUri, service, "host", host.Region, host.Version, token)
  487. if err != nil {
  488. log.Errorf("collect host %s metric error: %v", service, err)
  489. } else {
  490. metrics = append(metrics, part...)
  491. }
  492. part, err = collectApiStatsMetrics(ctx, service, "host", host.Region, host.ManagerUri, host.Version, token)
  493. if err != nil {
  494. log.Errorf("collect host %s metric error: %v", service, err)
  495. } else {
  496. metrics = append(metrics, part...)
  497. }
  498. return metrics
  499. }