host_dmesg.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package hostmetrics
  2. import (
  3. "bufio"
  4. "fmt"
  5. "os"
  6. "path"
  7. "path/filepath"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "yunion.io/x/jsonutils"
  13. "yunion.io/x/log"
  14. "yunion.io/x/pkg/errors"
  15. "yunion.io/x/onecloud/pkg/apis/compute"
  16. "yunion.io/x/onecloud/pkg/hostman/options"
  17. "yunion.io/x/onecloud/pkg/util/fileutils2"
  18. )
  19. const (
  20. kmsgPath = "/dev/kmsg"
  21. batchSize = 100
  22. flushInterval = 100 * time.Second
  23. )
  24. type SHostDmesgCollector struct {
  25. host IHostInfo
  26. mu sync.Mutex
  27. buffer []compute.SKmsgEntry
  28. pendingEntry *compute.SKmsgEntry
  29. bootTime time.Time
  30. }
  31. func NewHostDmesgCollector(hostInfo IHostInfo) *SHostDmesgCollector {
  32. return &SHostDmesgCollector{
  33. host: hostInfo,
  34. mu: sync.Mutex{},
  35. buffer: make([]compute.SKmsgEntry, 0),
  36. }
  37. }
  38. // /dev/kmsg
  39. // level;sequence,timestamp[us];message
  40. // include/linux/kern_levels.h
  41. // #define LOGLEVEL_EMERG 0 /* system is unusable */
  42. // #define LOGLEVEL_ALERT 1 /* action must be taken immediately */
  43. // #define LOGLEVEL_CRIT 2 /* critical conditions */
  44. // #define LOGLEVEL_ERR 3 /* error conditions */
  45. // #define LOGLEVEL_WARNING 4 /* warning conditions */
  46. // #define LOGLEVEL_NOTICE 5 /* normal but significant condition */
  47. // #define LOGLEVEL_INFO 6 /* informational */
  48. // #define LOGLEVEL_DEBUG 7 /* debug-level messages */
  49. func (c *SHostDmesgCollector) Start() {
  50. f, err := os.Open(kmsgPath)
  51. if err != nil {
  52. log.Errorf("failed open %s: %s", kmsgPath, err)
  53. return
  54. }
  55. defer f.Close()
  56. bootTime, err := getBootTime()
  57. if err != nil {
  58. log.Errorf("failed get boot time %s", err)
  59. return
  60. }
  61. c.bootTime = bootTime
  62. var currentBootStamp = bootTime.Unix()
  63. var lastSeq = 0
  64. readerState, err := c.loadState()
  65. if err != nil {
  66. log.Errorf("failed load readers state %s", err)
  67. } else if readerState != nil {
  68. if readerState.BootStamp == currentBootStamp {
  69. lastSeq = readerState.LastSeq
  70. }
  71. }
  72. log.Infof("Start dmesg reader from seq %d", lastSeq)
  73. go func() {
  74. for range time.Tick(flushInterval) {
  75. c.mu.Lock()
  76. c.flushBuffer()
  77. c.mu.Unlock()
  78. }
  79. }()
  80. reader := bufio.NewReader(f)
  81. for {
  82. line, err := reader.ReadString('\n')
  83. if err != nil {
  84. time.Sleep(100 * time.Millisecond)
  85. continue
  86. }
  87. line = strings.TrimSpace(line)
  88. if line == "" {
  89. continue
  90. }
  91. entry, err := c.parseKmsgLine(line, bootTime)
  92. if err != nil {
  93. if c.pendingEntry != nil {
  94. c.pendingEntry.Message += "\n" + line
  95. }
  96. continue
  97. }
  98. if entry.Seq <= lastSeq {
  99. continue
  100. }
  101. // 只上传 warn 以上级别的日志
  102. if entry.Level > 4 || c.isNoise(entry) {
  103. continue
  104. }
  105. c.mu.Lock()
  106. if c.pendingEntry == nil {
  107. c.pendingEntry = entry
  108. } else {
  109. c.buffer = append(c.buffer, *c.pendingEntry)
  110. if len(c.buffer) >= batchSize {
  111. c.flushBuffer()
  112. }
  113. c.pendingEntry = entry
  114. }
  115. c.mu.Unlock()
  116. }
  117. }
  118. func (c *SHostDmesgCollector) isNoise(entry *compute.SKmsgEntry) bool {
  119. if strings.HasPrefix(entry.Message, "IPVS:") {
  120. return true
  121. }
  122. return false
  123. }
  124. // flush buffer util success
  125. func (c *SHostDmesgCollector) flushBuffer() {
  126. if len(c.buffer) == 0 {
  127. return
  128. }
  129. seq := c.buffer[len(c.buffer)-1].Seq
  130. for {
  131. err := c.host.ReportHostDmesg(c.buffer)
  132. if err != nil {
  133. log.Errorf("failed report host dmesg %s", err)
  134. time.Sleep(time.Second * 30)
  135. continue
  136. }
  137. break
  138. }
  139. if err := c.saveState(seq); err != nil {
  140. log.Errorf("failed save dmesg reader state: %s", err)
  141. }
  142. c.buffer = c.buffer[:0]
  143. }
  144. func (c *SHostDmesgCollector) loadState() (*ReaderState, error) {
  145. dmesgStatePath := path.Join(filepath.Dir(options.HostOptions.ServersPath), "dmesg_reader_state")
  146. if !fileutils2.Exists(dmesgStatePath) {
  147. return nil, nil
  148. }
  149. data, err := fileutils2.FileGetContents(dmesgStatePath)
  150. if err != nil {
  151. return nil, err
  152. }
  153. jdata, err := jsonutils.ParseString(data)
  154. if err != nil {
  155. return nil, errors.Wrap(err, "failed parse dmesg reader state")
  156. }
  157. var s ReaderState
  158. err = jdata.Unmarshal(&s)
  159. if err != nil {
  160. return nil, errors.Wrap(err, "failed unmarshal reader state")
  161. }
  162. return &s, nil
  163. }
  164. func (c *SHostDmesgCollector) saveState(seq int) error {
  165. state := &ReaderState{
  166. LastSeq: seq,
  167. BootStamp: c.bootTime.Unix(),
  168. }
  169. jstate := jsonutils.Marshal(state)
  170. dmesgStatePath := path.Join(filepath.Dir(options.HostOptions.ServersPath), "dmesg_reader_state")
  171. return fileutils2.FilePutContents(dmesgStatePath, jstate.String(), false)
  172. }
  173. type ReaderState struct {
  174. LastSeq int `json:"last_seq"`
  175. BootStamp int64 `json:"boot_stamp"` // UNIX seconds of boot time
  176. }
  177. func getBootTime() (time.Time, error) {
  178. data, err := fileutils2.FileGetContents("/proc/uptime")
  179. if err != nil {
  180. return time.Time{}, err
  181. }
  182. fields := strings.Fields(data)
  183. if len(fields) < 1 {
  184. return time.Time{}, fmt.Errorf("invalid /proc/uptime")
  185. }
  186. uptimeSec, err := strconv.ParseFloat(fields[0], 64)
  187. if err != nil {
  188. return time.Time{}, err
  189. }
  190. return time.Now().Add(-time.Duration(uptimeSec * float64(time.Second))), nil
  191. }
  192. func (c *SHostDmesgCollector) parseKmsgLine(line string, bootTime time.Time) (*compute.SKmsgEntry, error) {
  193. parts := strings.SplitN(line, ";", 2)
  194. if len(parts) != 2 {
  195. return nil, fmt.Errorf("invalid kmsg line: %s", line)
  196. }
  197. meta := strings.Split(parts[0], ",")
  198. if len(meta) < 3 {
  199. return nil, fmt.Errorf("invalid meta: %s", parts[0])
  200. }
  201. levelStr := strings.Trim(meta[0], "<>")
  202. level, err := strconv.Atoi(levelStr)
  203. if err != nil {
  204. return nil, err
  205. }
  206. seq, _ := strconv.Atoi(meta[1])
  207. timestamp, _ := strconv.ParseUint(meta[2], 10, 64)
  208. rel := time.Duration(timestamp) * time.Microsecond
  209. abs := bootTime.Add(rel)
  210. return &compute.SKmsgEntry{
  211. Level: level,
  212. Seq: seq,
  213. Message: parts[1],
  214. Time: abs,
  215. }, nil
  216. }