| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- package hostmetrics
- import (
- "bufio"
- "fmt"
- "os"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/hostman/options"
- "yunion.io/x/onecloud/pkg/util/fileutils2"
- )
- const (
- kmsgPath = "/dev/kmsg"
- batchSize = 100
- flushInterval = 100 * time.Second
- )
- type SHostDmesgCollector struct {
- host IHostInfo
- mu sync.Mutex
- buffer []compute.SKmsgEntry
- pendingEntry *compute.SKmsgEntry
- bootTime time.Time
- }
- func NewHostDmesgCollector(hostInfo IHostInfo) *SHostDmesgCollector {
- return &SHostDmesgCollector{
- host: hostInfo,
- mu: sync.Mutex{},
- buffer: make([]compute.SKmsgEntry, 0),
- }
- }
- // /dev/kmsg
- // level;sequence,timestamp[us];message
- // include/linux/kern_levels.h
- // #define LOGLEVEL_EMERG 0 /* system is unusable */
- // #define LOGLEVEL_ALERT 1 /* action must be taken immediately */
- // #define LOGLEVEL_CRIT 2 /* critical conditions */
- // #define LOGLEVEL_ERR 3 /* error conditions */
- // #define LOGLEVEL_WARNING 4 /* warning conditions */
- // #define LOGLEVEL_NOTICE 5 /* normal but significant condition */
- // #define LOGLEVEL_INFO 6 /* informational */
- // #define LOGLEVEL_DEBUG 7 /* debug-level messages */
- func (c *SHostDmesgCollector) Start() {
- f, err := os.Open(kmsgPath)
- if err != nil {
- log.Errorf("failed open %s: %s", kmsgPath, err)
- return
- }
- defer f.Close()
- bootTime, err := getBootTime()
- if err != nil {
- log.Errorf("failed get boot time %s", err)
- return
- }
- c.bootTime = bootTime
- var currentBootStamp = bootTime.Unix()
- var lastSeq = 0
- readerState, err := c.loadState()
- if err != nil {
- log.Errorf("failed load readers state %s", err)
- } else if readerState != nil {
- if readerState.BootStamp == currentBootStamp {
- lastSeq = readerState.LastSeq
- }
- }
- log.Infof("Start dmesg reader from seq %d", lastSeq)
- go func() {
- for range time.Tick(flushInterval) {
- c.mu.Lock()
- c.flushBuffer()
- c.mu.Unlock()
- }
- }()
- reader := bufio.NewReader(f)
- for {
- line, err := reader.ReadString('\n')
- if err != nil {
- time.Sleep(100 * time.Millisecond)
- continue
- }
- line = strings.TrimSpace(line)
- if line == "" {
- continue
- }
- entry, err := c.parseKmsgLine(line, bootTime)
- if err != nil {
- if c.pendingEntry != nil {
- c.pendingEntry.Message += "\n" + line
- }
- continue
- }
- if entry.Seq <= lastSeq {
- continue
- }
- // 只上传 warn 以上级别的日志
- if entry.Level > 4 || c.isNoise(entry) {
- continue
- }
- c.mu.Lock()
- if c.pendingEntry == nil {
- c.pendingEntry = entry
- } else {
- c.buffer = append(c.buffer, *c.pendingEntry)
- if len(c.buffer) >= batchSize {
- c.flushBuffer()
- }
- c.pendingEntry = entry
- }
- c.mu.Unlock()
- }
- }
- func (c *SHostDmesgCollector) isNoise(entry *compute.SKmsgEntry) bool {
- if strings.HasPrefix(entry.Message, "IPVS:") {
- return true
- }
- return false
- }
- // flush buffer util success
- func (c *SHostDmesgCollector) flushBuffer() {
- if len(c.buffer) == 0 {
- return
- }
- seq := c.buffer[len(c.buffer)-1].Seq
- for {
- err := c.host.ReportHostDmesg(c.buffer)
- if err != nil {
- log.Errorf("failed report host dmesg %s", err)
- time.Sleep(time.Second * 30)
- continue
- }
- break
- }
- if err := c.saveState(seq); err != nil {
- log.Errorf("failed save dmesg reader state: %s", err)
- }
- c.buffer = c.buffer[:0]
- }
- func (c *SHostDmesgCollector) loadState() (*ReaderState, error) {
- dmesgStatePath := path.Join(filepath.Dir(options.HostOptions.ServersPath), "dmesg_reader_state")
- if !fileutils2.Exists(dmesgStatePath) {
- return nil, nil
- }
- data, err := fileutils2.FileGetContents(dmesgStatePath)
- if err != nil {
- return nil, err
- }
- jdata, err := jsonutils.ParseString(data)
- if err != nil {
- return nil, errors.Wrap(err, "failed parse dmesg reader state")
- }
- var s ReaderState
- err = jdata.Unmarshal(&s)
- if err != nil {
- return nil, errors.Wrap(err, "failed unmarshal reader state")
- }
- return &s, nil
- }
- func (c *SHostDmesgCollector) saveState(seq int) error {
- state := &ReaderState{
- LastSeq: seq,
- BootStamp: c.bootTime.Unix(),
- }
- jstate := jsonutils.Marshal(state)
- dmesgStatePath := path.Join(filepath.Dir(options.HostOptions.ServersPath), "dmesg_reader_state")
- return fileutils2.FilePutContents(dmesgStatePath, jstate.String(), false)
- }
- type ReaderState struct {
- LastSeq int `json:"last_seq"`
- BootStamp int64 `json:"boot_stamp"` // UNIX seconds of boot time
- }
- func getBootTime() (time.Time, error) {
- data, err := fileutils2.FileGetContents("/proc/uptime")
- if err != nil {
- return time.Time{}, err
- }
- fields := strings.Fields(data)
- if len(fields) < 1 {
- return time.Time{}, fmt.Errorf("invalid /proc/uptime")
- }
- uptimeSec, err := strconv.ParseFloat(fields[0], 64)
- if err != nil {
- return time.Time{}, err
- }
- return time.Now().Add(-time.Duration(uptimeSec * float64(time.Second))), nil
- }
- func (c *SHostDmesgCollector) parseKmsgLine(line string, bootTime time.Time) (*compute.SKmsgEntry, error) {
- parts := strings.SplitN(line, ";", 2)
- if len(parts) != 2 {
- return nil, fmt.Errorf("invalid kmsg line: %s", line)
- }
- meta := strings.Split(parts[0], ",")
- if len(meta) < 3 {
- return nil, fmt.Errorf("invalid meta: %s", parts[0])
- }
- levelStr := strings.Trim(meta[0], "<>")
- level, err := strconv.Atoi(levelStr)
- if err != nil {
- return nil, err
- }
- seq, _ := strconv.Atoi(meta[1])
- timestamp, _ := strconv.ParseUint(meta[2], 10, 64)
- rel := time.Duration(timestamp) * time.Microsecond
- abs := bootTime.Add(rel)
- return &compute.SKmsgEntry{
- Level: level,
- Seq: seq,
- Message: parts[1],
- Time: abs,
- }, nil
- }
|