| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific
- package metadata
- import (
- "bytes"
- "compress/gzip"
- "context"
- "io"
- "net/http"
- "strings"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/onecloud/pkg/hostman/guestman/desc"
- )
- const telegrafInfluxMaxBodyBytes = 16 << 20
- func (s *Service) rewriteTelegrafInfluxBodyIfNeeded(ctx context.Context, r *http.Request) error {
- prefix := s.monitorPrefix()
- if !strings.HasPrefix(r.URL.Path, prefix) {
- return nil
- }
- sub := r.URL.Path[len(prefix):]
- if r.Method != http.MethodPost && r.Method != http.MethodPut {
- return nil
- }
- if sub != "/write" && !strings.HasPrefix(sub, "/write?") {
- return nil
- }
- body, err := io.ReadAll(io.LimitReader(r.Body, telegrafInfluxMaxBodyBytes+1))
- if err != nil {
- return errors.Wrap(err, "read telegraf influx body")
- }
- if len(body) > telegrafInfluxMaxBodyBytes {
- return errors.Errorf("telegraf influx body exceeds %d bytes", telegrafInfluxMaxBodyBytes)
- }
- _ = r.Body.Close()
- if strings.Contains(strings.ToLower(r.Header.Get("Content-Encoding")), "gzip") {
- gr, err := gzip.NewReader(bytes.NewReader(body))
- if err != nil {
- return errors.Wrap(err, "gzip reader")
- }
- body, err = io.ReadAll(io.LimitReader(gr, telegrafInfluxMaxBodyBytes+1))
- _ = gr.Close()
- if err != nil {
- return errors.Wrap(err, "read gzipped telegraf body")
- }
- if len(body) > telegrafInfluxMaxBodyBytes {
- return errors.Errorf("telegraf influx body exceeds %d bytes after gzip", telegrafInfluxMaxBodyBytes)
- }
- r.Header.Del("Content-Encoding")
- }
- newBody, changed, err := rewriteInfluxLineProtocolTenant(body, func(vmId string) (string, bool) {
- gd := s.lookupGuestDescForTelegraf(r, vmId)
- if gd == nil || gd.TenantId == "" {
- return "", false
- }
- return gd.TenantId, true
- })
- if err != nil {
- return err
- }
- if changed {
- log.Debugf("metadata monitor: corrected tenant_id in telegraf influx payload from %s", r.RemoteAddr)
- }
- r.Body = io.NopCloser(bytes.NewReader(newBody))
- r.ContentLength = int64(len(newBody))
- r.Header.Del("Content-Length")
- return nil
- }
- func (s *Service) lookupGuestDescForTelegraf(r *http.Request, vmId string) *desc.SGuestDesc {
- if vmId == "" {
- return nil
- }
- gd := s.getGuestDesc(r)
- if gd != nil && gd.Uuid == vmId {
- return gd
- }
- return nil
- }
- func rewriteInfluxLineProtocolTenant(body []byte, resolveTenant func(vmId string) (tenantId string, ok bool)) ([]byte, bool, error) {
- raw := strings.Split(string(body), "\n")
- changed := false
- for i, line := range raw {
- line = strings.TrimRight(line, "\r")
- if line == "" || strings.HasPrefix(line, "#") {
- continue
- }
- newLine, lineChanged, err := rewriteInfluxLineTenant(line, resolveTenant)
- if err != nil {
- return body, false, err
- }
- if lineChanged {
- changed = true
- raw[i] = newLine
- }
- }
- if !changed {
- return body, false, nil
- }
- return []byte(strings.Join(raw, "\n")), true, nil
- }
- func rewriteInfluxLineTenant(line string, resolveTenant func(vmId string) (tenantId string, ok bool)) (string, bool, error) {
- measTags, fields, ok := splitMeasurementTagsAndFields(line)
- if !ok {
- return line, false, nil
- }
- parts := splitOnUnescapedComma(measTags)
- if len(parts) < 1 {
- return line, false, nil
- }
- measurement := parts[0]
- tagSegs := parts[1:]
- vmId := ""
- haveTenant := false
- curTenant := ""
- for _, seg := range tagSegs {
- k, v := splitInfluxTagKeyValue(seg)
- if k == "" {
- continue
- }
- switch k {
- case "vm_id":
- vmId = v
- case "tenant_id":
- haveTenant = true
- curTenant = v
- }
- }
- if vmId == "" {
- return line, false, nil
- }
- expectTenant, ok := resolveTenant(vmId)
- if !ok {
- return line, false, nil
- }
- if haveTenant && curTenant == expectTenant {
- return line, false, nil
- }
- newSegs := make([]string, 0, len(tagSegs)+1)
- for _, seg := range tagSegs {
- k, _ := splitInfluxTagKeyValue(seg)
- if k == "tenant_id" {
- newSegs = append(newSegs, "tenant_id="+expectTenant)
- } else {
- newSegs = append(newSegs, seg)
- }
- }
- if !haveTenant {
- newSegs = append(newSegs, "tenant_id="+expectTenant)
- }
- var b strings.Builder
- b.WriteString(measurement)
- for _, seg := range newSegs {
- b.WriteByte(',')
- b.WriteString(seg)
- }
- b.WriteByte(' ')
- b.WriteString(fields)
- return b.String(), true, nil
- }
- func splitMeasurementTagsAndFields(line string) (measTags string, fields string, ok bool) {
- for i := 0; i < len(line); i++ {
- if line[i] == ' ' && !influxByteEscaped(line, i) {
- return line[:i], line[i+1:], true
- }
- }
- return "", "", false
- }
- func influxByteEscaped(line string, i int) bool {
- if i == 0 {
- return false
- }
- n := 0
- for j := i - 1; j >= 0 && line[j] == '\\'; j-- {
- n++
- }
- return n%2 == 1
- }
- func splitOnUnescapedComma(s string) []string {
- var out []string
- var b strings.Builder
- escaped := false
- for i := 0; i < len(s); i++ {
- c := s[i]
- if escaped {
- b.WriteByte(c)
- escaped = false
- continue
- }
- if c == '\\' {
- escaped = true
- b.WriteByte('\\')
- continue
- }
- if c == ',' {
- out = append(out, b.String())
- b.Reset()
- continue
- }
- b.WriteByte(c)
- }
- out = append(out, b.String())
- return out
- }
- func splitInfluxTagKeyValue(seg string) (key, val string) {
- for i := 0; i < len(seg); i++ {
- if seg[i] == '=' && !influxByteEscaped(seg, i) {
- return influxUnescapeTagKey(seg[:i]), seg[i+1:]
- }
- }
- return "", ""
- }
- func influxUnescapeTagKey(s string) string {
- return influxUnescapeTag(s)
- }
- func influxUnescapeTag(s string) string {
- var b strings.Builder
- b.Grow(len(s))
- for i := 0; i < len(s); i++ {
- if s[i] == '\\' && i+1 < len(s) {
- switch s[i+1] {
- case '\\', ' ', ',', '=':
- b.WriteByte(s[i+1])
- i++
- continue
- }
- }
- b.WriteByte(s[i])
- }
- return b.String()
- }
|