telegraf.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package system_service
  15. import (
  16. "context"
  17. "fmt"
  18. "net/url"
  19. "os"
  20. "path/filepath"
  21. "sort"
  22. "strings"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/pkg/util/httputils"
  26. "yunion.io/x/onecloud/pkg/apis"
  27. "yunion.io/x/onecloud/pkg/util/procutils"
  28. )
  29. const (
  30. TELEGRAF_INPUT_RADEONTOP = "radeontop"
  31. TELEGRAF_INPUT_RADEONTOP_DEV_PATHS = "device_paths"
  32. TELEGRAF_INPUT_CONF_BIN_PATH = "bin_path"
  33. TELEGRAF_INPUT_NETDEV = "ni_rsrc_mon"
  34. TELEGRAF_INPUT_VASMI = "vasmi"
  35. TELEGRAF_INPUT_NVIDIASMI = "nvidia-smi"
  36. )
  37. type STelegraf struct {
  38. *SBaseSystemService
  39. }
  40. func NewTelegrafService() *STelegraf {
  41. return &STelegraf{NewBaseSystemService("telegraf", nil)}
  42. }
  43. func (s *STelegraf) GetConfig(kwargs map[string]interface{}) string {
  44. conf := ""
  45. conf += "[global_tags]\n"
  46. if tags, ok := kwargs["tags"]; ok {
  47. tgs, _ := tags.(map[string]string)
  48. keys := []string{}
  49. for k := range tgs {
  50. keys = append(keys, k)
  51. }
  52. sort.Strings(keys)
  53. for _, k := range keys {
  54. conf += fmt.Sprintf(" %s = \"%s\"\n", k, tgs[k])
  55. }
  56. }
  57. conf += "\n"
  58. conf += "[agent]\n"
  59. intVal := 60
  60. if v, ok := kwargs["interval"]; ok {
  61. intvalInt, _ := v.(int)
  62. if intvalInt > 0 {
  63. intVal = intvalInt
  64. }
  65. }
  66. conf += fmt.Sprintf(" interval = \"%ds\"\n", intVal)
  67. conf += " round_interval = true\n"
  68. conf += " metric_batch_size = 1000\n"
  69. conf += " metric_buffer_limit = 10000\n"
  70. conf += " collection_jitter = \"0s\"\n"
  71. conf += " flush_interval = \"60s\"\n"
  72. conf += " flush_jitter = \"0s\"\n"
  73. conf += " precision = \"\"\n"
  74. conf += " debug = false\n"
  75. conf += " quiet = false\n"
  76. // conf += " logfile = \"/var/log/telegraf/telegraf.err.log\"\n"
  77. var hostname string
  78. if hn, ok := kwargs["hostname"]; ok {
  79. hostname, _ = hn.(string)
  80. }
  81. conf += fmt.Sprintf(" hostname = \"%s\"\n", hostname)
  82. conf += " omit_hostname = false\n"
  83. conf += "\n"
  84. if influx, ok := kwargs[apis.SERVICE_TYPE_INFLUXDB]; ok {
  85. influxdb, _ := influx.(map[string]interface{})
  86. inUrls, _ := influxdb["url"]
  87. tUrls, _ := inUrls.([]string)
  88. inDatabase, _ := influxdb["database"]
  89. isVM := false
  90. if tsdbType, ok := influxdb["tsdb_type"]; ok {
  91. if tsdbType.(string) == apis.SERVICE_TYPE_VICTORIA_METRICS {
  92. isVM = true
  93. }
  94. }
  95. tdb, _ := inDatabase.(string)
  96. urls := []string{}
  97. for _, u := range tUrls {
  98. urls = append(urls, fmt.Sprintf("\"%s\"", u))
  99. }
  100. conf += "[[outputs.influxdb]]\n"
  101. conf += fmt.Sprintf(" urls = [%s]\n", strings.Join(urls, ", "))
  102. conf += fmt.Sprintf(" database = \"%s\"\n", tdb)
  103. conf += " insecure_skip_verify = true\n"
  104. if isVM {
  105. conf += " skip_database_creation = true\n"
  106. }
  107. conf += " timeout = \"30s\"\n"
  108. conf += "\n"
  109. }
  110. /*
  111. *
  112. * [[outputs.kafka]]
  113. * ## URLs of kafka brokers
  114. * brokers = ["localhost:9092"]
  115. * ## Kafka topic for producer messages
  116. * topic = "telegraf"
  117. * ## Optional SASL Config
  118. * sasl_username = "kafka"
  119. * sasl_password = "secret"
  120. * ## Optional SASL:
  121. * ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
  122. * ## (defaults to PLAIN)
  123. * sasl_mechanism = "PLAIN"
  124. */
  125. if kafka, ok := kwargs["kafka"]; ok {
  126. kafkaConf, _ := kafka.(map[string]interface{})
  127. conf += "[[outputs.kafka]]\n"
  128. for _, k := range []string{
  129. "brokers",
  130. "topic",
  131. "sasl_username",
  132. "sasl_password",
  133. "sasl_mechanism",
  134. } {
  135. if val, ok := kafkaConf[k]; ok {
  136. if k == "brokers" {
  137. brokers, _ := val.([]string)
  138. for i := range brokers {
  139. brokers[i] = fmt.Sprintf("\"%s\"", brokers[i])
  140. }
  141. conf += fmt.Sprintf(" brokers = [%s]\n", strings.Join(brokers, ", "))
  142. } else {
  143. conf += fmt.Sprintf(" %s = \"%s\"\n", k, val)
  144. }
  145. }
  146. }
  147. conf += " compression_codec = 0\n"
  148. conf += " required_acks = -1\n"
  149. conf += " max_retry = 3\n"
  150. conf += " data_format = \"json\"\n"
  151. conf += " json_timestamp_units = \"1ms\"\n"
  152. conf += " routing_tag = \"host\"\n"
  153. conf += "\n"
  154. }
  155. /*
  156. * [[outputs.opentsdb]]
  157. * host = "http://127.0.0.1"
  158. * port = 17000
  159. * http_batch_size = 50
  160. * http_path = "/opentsdb/put"
  161. * debug = false
  162. * separator = "_"
  163. */
  164. if opentsdb, ok := kwargs["opentsdb"]; ok {
  165. opentsdbConf, _ := opentsdb.(map[string]interface{})
  166. urlstr := opentsdbConf["url"].(string)
  167. urlParts, err := url.Parse(urlstr)
  168. if err != nil {
  169. log.Errorf("malformed opentsdb url: %s: %s", urlstr, err)
  170. } else {
  171. port := urlParts.Port()
  172. if len(port) == 0 {
  173. if urlParts.Scheme == "http" {
  174. port = "80"
  175. } else if urlParts.Scheme == "https" {
  176. port = "443"
  177. }
  178. }
  179. conf += "[[outputs.opentsdb]]\n"
  180. conf += fmt.Sprintf(" host = \"%s://%s\"\n", urlParts.Scheme, urlParts.Hostname())
  181. conf += fmt.Sprintf(" port = %s\n", urlParts.Port())
  182. conf += " http_batch_size = 50\n"
  183. conf += fmt.Sprintf(" http_path = \"%s\"\n", urlParts.Path)
  184. conf += " debug = false\n"
  185. conf += " separator = \"_\"\n"
  186. conf += "\n"
  187. }
  188. }
  189. conf += "[[inputs.cpu]]\n"
  190. conf += " percpu = true\n"
  191. conf += " totalcpu = true\n"
  192. conf += " collect_cpu_time = false\n"
  193. conf += " report_active = true\n"
  194. conf += "\n"
  195. conf += "[[inputs.disk]]\n"
  196. ignoreMountPoints := []string{
  197. "/etc/telegraf",
  198. "/etc/hosts",
  199. "/etc/hostname",
  200. "/etc/resolv.conf",
  201. "/dev/termination-log",
  202. }
  203. for i := range ignoreMountPoints {
  204. ignoreMountPoints[i] = fmt.Sprintf("%q", ignoreMountPoints[i])
  205. }
  206. ignorePathSegments := []string{
  207. "/run/k3s/containerd/",
  208. "/run/onecloud/containerd/",
  209. "/var/lib/",
  210. }
  211. if sp, ok := kwargs["server_path"]; ok {
  212. ignorePathSegments = append(ignorePathSegments, sp.(string))
  213. }
  214. for i := range ignorePathSegments {
  215. ignorePathSegments[i] = fmt.Sprintf("%q", ignorePathSegments[i])
  216. }
  217. conf += " ignore_mount_points = [" + strings.Join(ignoreMountPoints, ", ") + "]\n"
  218. conf += " ignore_path_segments = [" + strings.Join(ignorePathSegments, ", ") + "]\n"
  219. conf += " ignore_fs = [\"devtmpfs\", \"devfs\", \"overlayfs\", \"overlay\", \"squashfs\", \"iso9660\", \"rootfs\", \"hugetlbfs\", \"autofs\", \"aufs\"]\n"
  220. conf += "\n"
  221. conf += "[[inputs.diskio]]\n"
  222. conf += " skip_serial_number = false\n"
  223. conf += " excludes = \"^(nbd|loop)\"\n"
  224. conf += "\n"
  225. conf += "[[inputs.kernel]]\n"
  226. conf += "\n"
  227. conf += "[[inputs.kernel_vmstat]]\n"
  228. conf += "\n"
  229. conf += "[[inputs.mem]]\n"
  230. conf += "\n"
  231. conf += "[[inputs.processes]]\n"
  232. conf += "\n"
  233. conf += "[[inputs.swap]]\n"
  234. conf += "\n"
  235. conf += "[[inputs.system]]\n"
  236. conf += "\n"
  237. conf += "[[inputs.smart]]\n"
  238. conf += " path=\"/usr/sbin/smartctl\"\n"
  239. conf += "\n"
  240. conf += "[[inputs.sensors]]\n"
  241. conf += "\n"
  242. conf += "[[inputs.net]]\n"
  243. if nics, ok := kwargs["nics"]; ok {
  244. ns, _ := nics.([]map[string]interface{})
  245. infs := []string{}
  246. for _, n := range ns {
  247. iname, _ := n["name"]
  248. name, _ := iname.(string)
  249. infs = append(infs, fmt.Sprintf("\"%s\"", name))
  250. }
  251. conf += fmt.Sprintf(" interfaces = [%s]\n", strings.Join(infs, ", "))
  252. conf += "\n"
  253. for _, n := range ns {
  254. name, _ := n["name"].(string)
  255. alias, _ := n["alias"].(string)
  256. speed, _ := n["speed"].(int)
  257. conf += " [[inputs.net.interface_conf]]\n"
  258. conf += fmt.Sprintf(" name = \"%s\"\n", name)
  259. conf += fmt.Sprintf(" alias = \"%s\"\n", alias)
  260. conf += fmt.Sprintf(" speed = %d\n", speed)
  261. conf += "\n"
  262. }
  263. }
  264. conf += "[[inputs.netstat]]\n"
  265. conf += "\n"
  266. conf += "[[inputs.bond]]\n"
  267. conf += "\n"
  268. conf += "[[inputs.temp]]\n"
  269. conf += "\n"
  270. conf += "[[inputs.nstat]]\n"
  271. conf += "\n"
  272. conf += "[[inputs.ntpq]]\n"
  273. conf += " dns_lookup = false\n"
  274. conf += "\n"
  275. if pidFile, ok := kwargs["pid_file"]; ok {
  276. pf, _ := pidFile.(string)
  277. conf += "[[inputs.procstat]]\n"
  278. conf += fmt.Sprintf(" pid_file = \"%s\"\n", pf)
  279. conf += "\n"
  280. }
  281. conf += "[[inputs.internal]]\n"
  282. conf += " collect_memstats = false\n"
  283. conf += "\n"
  284. conf += "[[inputs.linux_sysctl_fs]]\n"
  285. conf += "\n"
  286. conf += "[[inputs.http_listener_v2]]\n"
  287. conf += " service_address = \"localhost:8087\"\n"
  288. conf += " path = \"/write\"\n"
  289. conf += " paths = [\"/write\"]\n" // Compatible with telegraf v1.36
  290. conf += " data_source = \"body\"\n"
  291. conf += " data_format = \"influx\"\n"
  292. conf += "\n"
  293. if haproxyConf, ok := kwargs["haproxy"]; ok {
  294. haproxyConfMap, _ := haproxyConf.(map[string]interface{})
  295. haIntVal, _ := haproxyConfMap["interval"].(int)
  296. statsSocket, _ := haproxyConfMap["stats_socket_path"].(string)
  297. conf += "[[inputs.haproxy]]\n"
  298. conf += fmt.Sprintf(" interval = \"%ds\"\n", haIntVal)
  299. conf += fmt.Sprintf(" servers = [\"%s\"]\n", statsSocket)
  300. conf += " keep_field_names = true\n"
  301. conf += "\n"
  302. }
  303. if radontop, ok := kwargs[TELEGRAF_INPUT_RADEONTOP]; ok {
  304. radontopMap, _ := radontop.(map[string]interface{})
  305. devPaths := radontopMap[TELEGRAF_INPUT_RADEONTOP_DEV_PATHS].([]string)
  306. devPathStr := make([]string, len(devPaths))
  307. for i, devPath := range devPaths {
  308. devPathStr[i] = fmt.Sprintf("\"%s\"", devPath)
  309. }
  310. conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_RADEONTOP)
  311. conf += fmt.Sprintf(" bin_path = \"%s\"\n", radontopMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
  312. conf += fmt.Sprintf(" %s = [%s]\n", TELEGRAF_INPUT_RADEONTOP_DEV_PATHS, strings.Join(devPathStr, ", "))
  313. conf += "\n"
  314. }
  315. if netdev, ok := kwargs[TELEGRAF_INPUT_NETDEV]; ok {
  316. netdevMap, _ := netdev.(map[string]interface{})
  317. conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_NETDEV)
  318. conf += fmt.Sprintf(" bin_path = \"%s\"\n", netdevMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
  319. conf += "\n"
  320. }
  321. if vasmi, ok := kwargs[TELEGRAF_INPUT_VASMI]; ok {
  322. vasmiMap, _ := vasmi.(map[string]interface{})
  323. conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_VASMI)
  324. conf += fmt.Sprintf(" bin_path = \"%s\"\n", vasmiMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
  325. conf += "\n"
  326. }
  327. if _, ok := kwargs[TELEGRAF_INPUT_NVIDIASMI]; ok {
  328. conf += "[[inputs.nvidia_smi]]\n"
  329. conf += "\n"
  330. }
  331. // 检查 IPMI 设备文件是否存在
  332. if hasIPMIDevice() {
  333. conf += "[[inputs.ipmi_sensor]]\n"
  334. conf += " metric_version = 2\n"
  335. conf += " sensors = [\"sdr\"]\n"
  336. conf += "\n"
  337. }
  338. return conf
  339. }
  340. func (s *STelegraf) GetConfigFile() string {
  341. dir := getTelegrafConfigDir()
  342. procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", dir).Run()
  343. return filepath.Join(dir, "telegraf.conf")
  344. }
  345. func getTelegrafConfigDir() string {
  346. defaultTelegrafConfigDir := "/etc/telegraf"
  347. telegrafConfigDir := os.Getenv("HOST_TELEGRAF_CONFIG_DIR")
  348. if telegrafConfigDir == "" {
  349. telegrafConfigDir = defaultTelegrafConfigDir
  350. }
  351. return telegrafConfigDir
  352. }
  353. func GetTelegrafConfDDir() string {
  354. dir := getTelegrafConfigDir()
  355. return filepath.Join(dir, "telegraf.d")
  356. }
  357. func (s *STelegraf) Reload(kwargs map[string]interface{}) error {
  358. return s.reload(s.GetConfig(kwargs), s.GetConfigFile())
  359. }
  360. func (s *STelegraf) BgReload(kwargs map[string]interface{}) {
  361. go s.reload(s.GetConfig(kwargs), s.GetConfigFile())
  362. }
  363. func (s *STelegraf) BgReloadConf(kwargs map[string]interface{}) {
  364. go func() {
  365. reload, err := s.reloadConf(s.GetConfig(kwargs), s.GetConfigFile())
  366. if err != nil {
  367. log.Errorf("Failed reload conf: %s", err)
  368. }
  369. if reload {
  370. err := s.ReloadTelegraf()
  371. if err != nil {
  372. log.Errorf("failed reload telegraf: %s", err)
  373. }
  374. }
  375. }()
  376. }
  377. func (s *STelegraf) ReloadTelegraf() error {
  378. log.Infof("Start reloading telegraf...")
  379. errs := []error{}
  380. if err := s.reloadTelegrafByHTTP(); err != nil {
  381. errs = append(errs, errors.Wrap(err, "reloadTelegrafByHTTP"))
  382. return errors.NewAggregate(errs)
  383. }
  384. log.Infof("Finish reloading telegraf")
  385. return nil
  386. }
  387. func (s *STelegraf) reloadTelegrafByDocker() error {
  388. log.Infof("Reloading telegraf by docker...")
  389. output, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", "/usr/bin/docker ps --filter 'label=io.kubernetes.container.name=telegraf' --format '{{.ID}}'").Output()
  390. if err != nil {
  391. return errors.Wrap(err, "using docker ps find telegraf container")
  392. }
  393. id := strings.TrimSpace(string(output))
  394. if len(id) == 0 {
  395. return errors.Errorf("not found telegraf running container")
  396. }
  397. if err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", "/usr/bin/docker restart "+id).Run(); err != nil {
  398. return errors.Wrapf(err, "restart telegraf container %q", id)
  399. }
  400. return nil
  401. }
  402. func (s *STelegraf) reloadTelegrafByHTTP() error {
  403. telegrafReoladUrl := "http://localhost:8087/reload"
  404. log.Infof("Reloading telegraf by %q ...", telegrafReoladUrl)
  405. if _, _, err := httputils.JSONRequest(
  406. httputils.GetDefaultClient(), context.Background(),
  407. "POST", telegrafReoladUrl, nil, nil, false,
  408. ); err != nil {
  409. return errors.Wrap(err, "reload telegraf by http reload api")
  410. }
  411. return nil
  412. }
  413. // hasIPMIDevice 检查 IPMI 设备文件是否存在
  414. func hasIPMIDevice() bool {
  415. ipmiDevices := []string{
  416. "/dev/ipmi0",
  417. "/dev/ipmi/0",
  418. "/dev/ipmidev/0",
  419. }
  420. for _, device := range ipmiDevices {
  421. if _, err := os.Stat(device); err == nil {
  422. return true
  423. }
  424. }
  425. return false
  426. }