| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package system_service
- import (
- "context"
- "fmt"
- "net/url"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/onecloud/pkg/apis"
- "yunion.io/x/onecloud/pkg/util/procutils"
- )
- const (
- TELEGRAF_INPUT_RADEONTOP = "radeontop"
- TELEGRAF_INPUT_RADEONTOP_DEV_PATHS = "device_paths"
- TELEGRAF_INPUT_CONF_BIN_PATH = "bin_path"
- TELEGRAF_INPUT_NETDEV = "ni_rsrc_mon"
- TELEGRAF_INPUT_VASMI = "vasmi"
- TELEGRAF_INPUT_NVIDIASMI = "nvidia-smi"
- )
- type STelegraf struct {
- *SBaseSystemService
- }
- func NewTelegrafService() *STelegraf {
- return &STelegraf{NewBaseSystemService("telegraf", nil)}
- }
- func (s *STelegraf) GetConfig(kwargs map[string]interface{}) string {
- conf := ""
- conf += "[global_tags]\n"
- if tags, ok := kwargs["tags"]; ok {
- tgs, _ := tags.(map[string]string)
- keys := []string{}
- for k := range tgs {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- for _, k := range keys {
- conf += fmt.Sprintf(" %s = \"%s\"\n", k, tgs[k])
- }
- }
- conf += "\n"
- conf += "[agent]\n"
- intVal := 60
- if v, ok := kwargs["interval"]; ok {
- intvalInt, _ := v.(int)
- if intvalInt > 0 {
- intVal = intvalInt
- }
- }
- conf += fmt.Sprintf(" interval = \"%ds\"\n", intVal)
- conf += " round_interval = true\n"
- conf += " metric_batch_size = 1000\n"
- conf += " metric_buffer_limit = 10000\n"
- conf += " collection_jitter = \"0s\"\n"
- conf += " flush_interval = \"60s\"\n"
- conf += " flush_jitter = \"0s\"\n"
- conf += " precision = \"\"\n"
- conf += " debug = false\n"
- conf += " quiet = false\n"
- // conf += " logfile = \"/var/log/telegraf/telegraf.err.log\"\n"
- var hostname string
- if hn, ok := kwargs["hostname"]; ok {
- hostname, _ = hn.(string)
- }
- conf += fmt.Sprintf(" hostname = \"%s\"\n", hostname)
- conf += " omit_hostname = false\n"
- conf += "\n"
- if influx, ok := kwargs[apis.SERVICE_TYPE_INFLUXDB]; ok {
- influxdb, _ := influx.(map[string]interface{})
- inUrls, _ := influxdb["url"]
- tUrls, _ := inUrls.([]string)
- inDatabase, _ := influxdb["database"]
- isVM := false
- if tsdbType, ok := influxdb["tsdb_type"]; ok {
- if tsdbType.(string) == apis.SERVICE_TYPE_VICTORIA_METRICS {
- isVM = true
- }
- }
- tdb, _ := inDatabase.(string)
- urls := []string{}
- for _, u := range tUrls {
- urls = append(urls, fmt.Sprintf("\"%s\"", u))
- }
- conf += "[[outputs.influxdb]]\n"
- conf += fmt.Sprintf(" urls = [%s]\n", strings.Join(urls, ", "))
- conf += fmt.Sprintf(" database = \"%s\"\n", tdb)
- conf += " insecure_skip_verify = true\n"
- if isVM {
- conf += " skip_database_creation = true\n"
- }
- conf += " timeout = \"30s\"\n"
- conf += "\n"
- }
- /*
- *
- * [[outputs.kafka]]
- * ## URLs of kafka brokers
- * brokers = ["localhost:9092"]
- * ## Kafka topic for producer messages
- * topic = "telegraf"
- * ## Optional SASL Config
- * sasl_username = "kafka"
- * sasl_password = "secret"
- * ## Optional SASL:
- * ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
- * ## (defaults to PLAIN)
- * sasl_mechanism = "PLAIN"
- */
- if kafka, ok := kwargs["kafka"]; ok {
- kafkaConf, _ := kafka.(map[string]interface{})
- conf += "[[outputs.kafka]]\n"
- for _, k := range []string{
- "brokers",
- "topic",
- "sasl_username",
- "sasl_password",
- "sasl_mechanism",
- } {
- if val, ok := kafkaConf[k]; ok {
- if k == "brokers" {
- brokers, _ := val.([]string)
- for i := range brokers {
- brokers[i] = fmt.Sprintf("\"%s\"", brokers[i])
- }
- conf += fmt.Sprintf(" brokers = [%s]\n", strings.Join(brokers, ", "))
- } else {
- conf += fmt.Sprintf(" %s = \"%s\"\n", k, val)
- }
- }
- }
- conf += " compression_codec = 0\n"
- conf += " required_acks = -1\n"
- conf += " max_retry = 3\n"
- conf += " data_format = \"json\"\n"
- conf += " json_timestamp_units = \"1ms\"\n"
- conf += " routing_tag = \"host\"\n"
- conf += "\n"
- }
- /*
- * [[outputs.opentsdb]]
- * host = "http://127.0.0.1"
- * port = 17000
- * http_batch_size = 50
- * http_path = "/opentsdb/put"
- * debug = false
- * separator = "_"
- */
- if opentsdb, ok := kwargs["opentsdb"]; ok {
- opentsdbConf, _ := opentsdb.(map[string]interface{})
- urlstr := opentsdbConf["url"].(string)
- urlParts, err := url.Parse(urlstr)
- if err != nil {
- log.Errorf("malformed opentsdb url: %s: %s", urlstr, err)
- } else {
- port := urlParts.Port()
- if len(port) == 0 {
- if urlParts.Scheme == "http" {
- port = "80"
- } else if urlParts.Scheme == "https" {
- port = "443"
- }
- }
- conf += "[[outputs.opentsdb]]\n"
- conf += fmt.Sprintf(" host = \"%s://%s\"\n", urlParts.Scheme, urlParts.Hostname())
- conf += fmt.Sprintf(" port = %s\n", urlParts.Port())
- conf += " http_batch_size = 50\n"
- conf += fmt.Sprintf(" http_path = \"%s\"\n", urlParts.Path)
- conf += " debug = false\n"
- conf += " separator = \"_\"\n"
- conf += "\n"
- }
- }
- conf += "[[inputs.cpu]]\n"
- conf += " percpu = true\n"
- conf += " totalcpu = true\n"
- conf += " collect_cpu_time = false\n"
- conf += " report_active = true\n"
- conf += "\n"
- conf += "[[inputs.disk]]\n"
- ignoreMountPoints := []string{
- "/etc/telegraf",
- "/etc/hosts",
- "/etc/hostname",
- "/etc/resolv.conf",
- "/dev/termination-log",
- }
- for i := range ignoreMountPoints {
- ignoreMountPoints[i] = fmt.Sprintf("%q", ignoreMountPoints[i])
- }
- ignorePathSegments := []string{
- "/run/k3s/containerd/",
- "/run/onecloud/containerd/",
- "/var/lib/",
- }
- if sp, ok := kwargs["server_path"]; ok {
- ignorePathSegments = append(ignorePathSegments, sp.(string))
- }
- for i := range ignorePathSegments {
- ignorePathSegments[i] = fmt.Sprintf("%q", ignorePathSegments[i])
- }
- conf += " ignore_mount_points = [" + strings.Join(ignoreMountPoints, ", ") + "]\n"
- conf += " ignore_path_segments = [" + strings.Join(ignorePathSegments, ", ") + "]\n"
- conf += " ignore_fs = [\"devtmpfs\", \"devfs\", \"overlayfs\", \"overlay\", \"squashfs\", \"iso9660\", \"rootfs\", \"hugetlbfs\", \"autofs\", \"aufs\"]\n"
- conf += "\n"
- conf += "[[inputs.diskio]]\n"
- conf += " skip_serial_number = false\n"
- conf += " excludes = \"^(nbd|loop)\"\n"
- conf += "\n"
- conf += "[[inputs.kernel]]\n"
- conf += "\n"
- conf += "[[inputs.kernel_vmstat]]\n"
- conf += "\n"
- conf += "[[inputs.mem]]\n"
- conf += "\n"
- conf += "[[inputs.processes]]\n"
- conf += "\n"
- conf += "[[inputs.swap]]\n"
- conf += "\n"
- conf += "[[inputs.system]]\n"
- conf += "\n"
- conf += "[[inputs.smart]]\n"
- conf += " path=\"/usr/sbin/smartctl\"\n"
- conf += "\n"
- conf += "[[inputs.sensors]]\n"
- conf += "\n"
- conf += "[[inputs.net]]\n"
- if nics, ok := kwargs["nics"]; ok {
- ns, _ := nics.([]map[string]interface{})
- infs := []string{}
- for _, n := range ns {
- iname, _ := n["name"]
- name, _ := iname.(string)
- infs = append(infs, fmt.Sprintf("\"%s\"", name))
- }
- conf += fmt.Sprintf(" interfaces = [%s]\n", strings.Join(infs, ", "))
- conf += "\n"
- for _, n := range ns {
- name, _ := n["name"].(string)
- alias, _ := n["alias"].(string)
- speed, _ := n["speed"].(int)
- conf += " [[inputs.net.interface_conf]]\n"
- conf += fmt.Sprintf(" name = \"%s\"\n", name)
- conf += fmt.Sprintf(" alias = \"%s\"\n", alias)
- conf += fmt.Sprintf(" speed = %d\n", speed)
- conf += "\n"
- }
- }
- conf += "[[inputs.netstat]]\n"
- conf += "\n"
- conf += "[[inputs.bond]]\n"
- conf += "\n"
- conf += "[[inputs.temp]]\n"
- conf += "\n"
- conf += "[[inputs.nstat]]\n"
- conf += "\n"
- conf += "[[inputs.ntpq]]\n"
- conf += " dns_lookup = false\n"
- conf += "\n"
- if pidFile, ok := kwargs["pid_file"]; ok {
- pf, _ := pidFile.(string)
- conf += "[[inputs.procstat]]\n"
- conf += fmt.Sprintf(" pid_file = \"%s\"\n", pf)
- conf += "\n"
- }
- conf += "[[inputs.internal]]\n"
- conf += " collect_memstats = false\n"
- conf += "\n"
- conf += "[[inputs.linux_sysctl_fs]]\n"
- conf += "\n"
- conf += "[[inputs.http_listener_v2]]\n"
- conf += " service_address = \"localhost:8087\"\n"
- conf += " path = \"/write\"\n"
- conf += " paths = [\"/write\"]\n" // Compatible with telegraf v1.36
- conf += " data_source = \"body\"\n"
- conf += " data_format = \"influx\"\n"
- conf += "\n"
- if haproxyConf, ok := kwargs["haproxy"]; ok {
- haproxyConfMap, _ := haproxyConf.(map[string]interface{})
- haIntVal, _ := haproxyConfMap["interval"].(int)
- statsSocket, _ := haproxyConfMap["stats_socket_path"].(string)
- conf += "[[inputs.haproxy]]\n"
- conf += fmt.Sprintf(" interval = \"%ds\"\n", haIntVal)
- conf += fmt.Sprintf(" servers = [\"%s\"]\n", statsSocket)
- conf += " keep_field_names = true\n"
- conf += "\n"
- }
- if radontop, ok := kwargs[TELEGRAF_INPUT_RADEONTOP]; ok {
- radontopMap, _ := radontop.(map[string]interface{})
- devPaths := radontopMap[TELEGRAF_INPUT_RADEONTOP_DEV_PATHS].([]string)
- devPathStr := make([]string, len(devPaths))
- for i, devPath := range devPaths {
- devPathStr[i] = fmt.Sprintf("\"%s\"", devPath)
- }
- conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_RADEONTOP)
- conf += fmt.Sprintf(" bin_path = \"%s\"\n", radontopMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
- conf += fmt.Sprintf(" %s = [%s]\n", TELEGRAF_INPUT_RADEONTOP_DEV_PATHS, strings.Join(devPathStr, ", "))
- conf += "\n"
- }
- if netdev, ok := kwargs[TELEGRAF_INPUT_NETDEV]; ok {
- netdevMap, _ := netdev.(map[string]interface{})
- conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_NETDEV)
- conf += fmt.Sprintf(" bin_path = \"%s\"\n", netdevMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
- conf += "\n"
- }
- if vasmi, ok := kwargs[TELEGRAF_INPUT_VASMI]; ok {
- vasmiMap, _ := vasmi.(map[string]interface{})
- conf += fmt.Sprintf("[[inputs.%s]]\n", TELEGRAF_INPUT_VASMI)
- conf += fmt.Sprintf(" bin_path = \"%s\"\n", vasmiMap[TELEGRAF_INPUT_CONF_BIN_PATH].(string))
- conf += "\n"
- }
- if _, ok := kwargs[TELEGRAF_INPUT_NVIDIASMI]; ok {
- conf += "[[inputs.nvidia_smi]]\n"
- conf += "\n"
- }
- // 检查 IPMI 设备文件是否存在
- if hasIPMIDevice() {
- conf += "[[inputs.ipmi_sensor]]\n"
- conf += " metric_version = 2\n"
- conf += " sensors = [\"sdr\"]\n"
- conf += "\n"
- }
- return conf
- }
- func (s *STelegraf) GetConfigFile() string {
- dir := getTelegrafConfigDir()
- procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", dir).Run()
- return filepath.Join(dir, "telegraf.conf")
- }
- func getTelegrafConfigDir() string {
- defaultTelegrafConfigDir := "/etc/telegraf"
- telegrafConfigDir := os.Getenv("HOST_TELEGRAF_CONFIG_DIR")
- if telegrafConfigDir == "" {
- telegrafConfigDir = defaultTelegrafConfigDir
- }
- return telegrafConfigDir
- }
- func GetTelegrafConfDDir() string {
- dir := getTelegrafConfigDir()
- return filepath.Join(dir, "telegraf.d")
- }
- func (s *STelegraf) Reload(kwargs map[string]interface{}) error {
- return s.reload(s.GetConfig(kwargs), s.GetConfigFile())
- }
- func (s *STelegraf) BgReload(kwargs map[string]interface{}) {
- go s.reload(s.GetConfig(kwargs), s.GetConfigFile())
- }
- func (s *STelegraf) BgReloadConf(kwargs map[string]interface{}) {
- go func() {
- reload, err := s.reloadConf(s.GetConfig(kwargs), s.GetConfigFile())
- if err != nil {
- log.Errorf("Failed reload conf: %s", err)
- }
- if reload {
- err := s.ReloadTelegraf()
- if err != nil {
- log.Errorf("failed reload telegraf: %s", err)
- }
- }
- }()
- }
- func (s *STelegraf) ReloadTelegraf() error {
- log.Infof("Start reloading telegraf...")
- errs := []error{}
- if err := s.reloadTelegrafByHTTP(); err != nil {
- errs = append(errs, errors.Wrap(err, "reloadTelegrafByHTTP"))
- return errors.NewAggregate(errs)
- }
- log.Infof("Finish reloading telegraf")
- return nil
- }
- func (s *STelegraf) reloadTelegrafByDocker() error {
- log.Infof("Reloading telegraf by docker...")
- output, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", "/usr/bin/docker ps --filter 'label=io.kubernetes.container.name=telegraf' --format '{{.ID}}'").Output()
- if err != nil {
- return errors.Wrap(err, "using docker ps find telegraf container")
- }
- id := strings.TrimSpace(string(output))
- if len(id) == 0 {
- return errors.Errorf("not found telegraf running container")
- }
- if err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", "/usr/bin/docker restart "+id).Run(); err != nil {
- return errors.Wrapf(err, "restart telegraf container %q", id)
- }
- return nil
- }
- func (s *STelegraf) reloadTelegrafByHTTP() error {
- telegrafReoladUrl := "http://localhost:8087/reload"
- log.Infof("Reloading telegraf by %q ...", telegrafReoladUrl)
- if _, _, err := httputils.JSONRequest(
- httputils.GetDefaultClient(), context.Background(),
- "POST", telegrafReoladUrl, nil, nil, false,
- ); err != nil {
- return errors.Wrap(err, "reload telegraf by http reload api")
- }
- return nil
- }
- // hasIPMIDevice 检查 IPMI 设备文件是否存在
- func hasIPMIDevice() bool {
- ipmiDevices := []string{
- "/dev/ipmi0",
- "/dev/ipmi/0",
- "/dev/ipmidev/0",
- }
- for _, device := range ipmiDevices {
- if _, err := os.Stat(device); err == nil {
- return true
- }
- }
- return false
- }
|