main.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "fmt"
  18. "os"
  19. "strings"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/client-go/kubernetes"
  24. "k8s.io/client-go/rest"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/util/httputils"
  28. "yunion.io/x/onecloud/pkg/baremetal/utils/detect_storages"
  29. "yunion.io/x/onecloud/pkg/baremetal/utils/raid/drivers"
  30. "yunion.io/x/onecloud/pkg/compute/baremetal"
  31. "yunion.io/x/onecloud/pkg/util/procutils"
  32. )
  33. const (
  34. INTERVAL_SECOND = 300
  35. TelegrafServer = "http://localhost:8087/write"
  36. )
  37. // Failed, Offline, Degraded, Rebuilding, Out of Sync (OSY)
  38. func main() {
  39. logLevel := "debug"
  40. logVerboseLevel := 5
  41. log.SetVerboseLevel(int32(logVerboseLevel))
  42. err := log.SetLogLevelByString(log.Logger(), logLevel)
  43. if err != nil {
  44. log.Fatalf("Set log level %q: %v", logLevel, err)
  45. }
  46. // creates the in-cluster config
  47. config, err := rest.InClusterConfig()
  48. if err != nil {
  49. log.Fatalln(err)
  50. }
  51. // creates the clientset
  52. clientset, err := kubernetes.NewForConfig(config)
  53. if err != nil {
  54. log.Fatalln(err)
  55. }
  56. nodeName := os.Getenv("NODENAME")
  57. if len(nodeName) == 0 {
  58. log.Fatalf("Missing env nodename")
  59. }
  60. node, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
  61. if err != nil {
  62. log.Fatalln(err)
  63. }
  64. var masterAddress string
  65. if length := len(node.Status.Conditions); length > 0 {
  66. if node.Status.Conditions[length-1].Type == v1.NodeReady &&
  67. node.Status.Conditions[length-1].Status == v1.ConditionTrue {
  68. for _, addr := range node.Status.Addresses {
  69. if addr.Type == v1.NodeInternalIP {
  70. masterAddress = addr.Address
  71. break
  72. }
  73. }
  74. }
  75. }
  76. log.Infof("Start Colloct Raid Info And Send To Telegraf ...")
  77. c := NewRaidInfoCollector(nodeName, masterAddress, INTERVAL_SECOND)
  78. c.Start()
  79. }
  80. type RaidInfoCollector struct {
  81. waitingReportData []string
  82. LastCollectTime time.Time
  83. ReportInterval int // seconds
  84. Hostname string
  85. HostIp string
  86. }
  87. func NewRaidInfoCollector(hostname, hostIp string, interval int) *RaidInfoCollector {
  88. return &RaidInfoCollector{
  89. waitingReportData: make([]string, 0),
  90. ReportInterval: interval,
  91. Hostname: hostname,
  92. HostIp: hostIp,
  93. }
  94. }
  95. func (c *RaidInfoCollector) runMain() {
  96. timeBegin := time.Now()
  97. elapse := timeBegin.Sub(c.LastCollectTime)
  98. if elapse < time.Second*time.Duration(c.ReportInterval) {
  99. return
  100. } else {
  101. c.LastCollectTime = timeBegin
  102. }
  103. c.runMonitor()
  104. }
  105. func (c *RaidInfoCollector) runMonitor() {
  106. reportData := c.collectReportData()
  107. if len(reportData) > 0 {
  108. c.reportRaidInfoToTelegraf(reportData)
  109. }
  110. }
  111. func (c *RaidInfoCollector) collectReportData() string {
  112. if len(c.waitingReportData) > 60 {
  113. c.waitingReportData = c.waitingReportData[1:]
  114. }
  115. return c.CollectReportData()
  116. }
  117. func (c *RaidInfoCollector) CollectReportData() string {
  118. raidDiskInfo := make([]*baremetal.BaremetalStorage, 0)
  119. // raidDrivers := []string{}
  120. for _, drv := range drivers.GetDrivers(drivers.NewExecutor()) {
  121. if err := drv.ParsePhyDevs(); err != nil {
  122. log.Warningf("Raid driver %s ParsePhyDevs failed: %s", drv.GetName(), err)
  123. continue
  124. }
  125. raidDiskInfo = append(raidDiskInfo, detect_storages.GetRaidDevices(drv)...)
  126. // raidDrivers = append(raidDrivers, drv.GetName())
  127. }
  128. if len(raidDiskInfo) > 0 {
  129. ret := c.toTelegrafReportData(raidDiskInfo)
  130. return ret
  131. }
  132. return ""
  133. }
  134. func (c *RaidInfoCollector) Start() {
  135. go procutils.WaitZombieLoop(context.TODO())
  136. for {
  137. c.runMain()
  138. time.Sleep(time.Second * 1)
  139. }
  140. }
  141. const MEASUREMENT = "host_raid"
  142. func (c *RaidInfoCollector) toTelegrafReportData(raidDiskInfo []*baremetal.BaremetalStorage) string {
  143. tag := fmt.Sprintf("%s=%s,%s=%s", "hostname", c.Hostname, "host_ip", c.HostIp)
  144. ret := []string{}
  145. for i := 0; i < len(raidDiskInfo); i++ {
  146. fieldArr := []string{}
  147. tagArr := []string{}
  148. raidDiskInfo[i].Status = strings.ToLower(raidDiskInfo[i].Status)
  149. jStat := jsonutils.Marshal(raidDiskInfo[i])
  150. jMap, _ := jStat.GetMap()
  151. for k, v := range jMap {
  152. vStr, _ := v.GetString()
  153. if vStr == "" {
  154. continue
  155. }
  156. vStr = strings.ReplaceAll(vStr, " ", "\\ ")
  157. kv := fmt.Sprintf("%s=%s", k, vStr)
  158. switch k {
  159. case "adapter", "slot":
  160. fieldArr = append(fieldArr, kv)
  161. case "model", "driver", "status":
  162. tagArr = append(tagArr, kv)
  163. case "index":
  164. continue
  165. default:
  166. tagArr = append(tagArr, kv)
  167. }
  168. }
  169. field := strings.Join(fieldArr, ",")
  170. diskTag := tag + "," + strings.Join(tagArr, ",")
  171. line := fmt.Sprintf("%s,%s %s", MEASUREMENT, diskTag, field)
  172. ret = append(ret, line)
  173. }
  174. return strings.Join(ret, "\n")
  175. }
  176. func (c *RaidInfoCollector) reportRaidInfoToTelegraf(data string) {
  177. body := strings.NewReader(data)
  178. res, err := httputils.Request(
  179. httputils.GetDefaultClient(), context.Background(), "POST", TelegrafServer, nil, body, true)
  180. if err != nil {
  181. log.Errorf("Upload guest metric failed: %s", err)
  182. return
  183. }
  184. defer res.Body.Close()
  185. if res.StatusCode != 204 {
  186. log.Errorf("upload guest metric failed %d", res.StatusCode)
  187. timestamp := time.Now().UnixNano()
  188. for _, line := range strings.Split(data, "\n") {
  189. c.waitingReportData = append(c.waitingReportData,
  190. fmt.Sprintf("%s %d", line, timestamp))
  191. }
  192. } else {
  193. if len(c.waitingReportData) > 0 {
  194. oldDatas := strings.Join(c.waitingReportData, "\n")
  195. body = strings.NewReader(oldDatas)
  196. res, err = httputils.Request(
  197. httputils.GetDefaultClient(), context.Background(), "POST", TelegrafServer, nil, body, false)
  198. if err == nil {
  199. defer res.Body.Close()
  200. }
  201. if res.StatusCode == 204 {
  202. c.waitingReportData = c.waitingReportData[len(c.waitingReportData):]
  203. } else {
  204. log.Errorf("upload guest metric failed code: %d", res.StatusCode)
  205. }
  206. }
  207. }
  208. }