response_parser.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package influxdb
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/sets"
  24. "yunion.io/x/onecloud/pkg/apis/monitor"
  25. "yunion.io/x/onecloud/pkg/monitor/tsdb"
  26. )
  27. type ResponseParser struct{}
  28. var (
  29. legendFormat *regexp.Regexp
  30. )
  31. func init() {
  32. legendFormat = regexp.MustCompile(`\[\[(\w+)(\.\w+)*\]\]*|\$\s*(\w+?)*`)
  33. }
  34. func (rp *ResponseParser) Parse(response *Response, query *Query) *tsdb.QueryResult {
  35. queryRes := tsdb.NewQueryResult()
  36. for _, result := range response.Results {
  37. queryRes.Series = append(queryRes.Series, rp.transformRowsV2(result.Series, queryRes, query)...)
  38. }
  39. return queryRes
  40. }
  41. func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult, query *Query) monitor.TimeSeriesSlice {
  42. var result monitor.TimeSeriesSlice
  43. for _, row := range rows {
  44. for columnIndex, column := range row.Columns {
  45. if column == "time" {
  46. continue
  47. }
  48. var points monitor.TimeSeriesPoints
  49. for _, valuePair := range row.Values {
  50. point, err := rp.parseTimepoint(valuePair, columnIndex)
  51. if err == nil {
  52. points = append(points, point)
  53. }
  54. }
  55. result = append(result, &monitor.TimeSeries{
  56. Name: rp.formatSerieName(row, column, query),
  57. Points: points,
  58. Tags: row.Tags,
  59. })
  60. }
  61. }
  62. return result
  63. }
  64. func (rp *ResponseParser) transformRowsV2(rows []Row, queryResult *tsdb.QueryResult, query *Query) monitor.TimeSeriesSlice {
  65. var result monitor.TimeSeriesSlice
  66. // 添加值不同的 tag key
  67. diffTagKeys := sets.NewString()
  68. if len(rows) > 1 {
  69. row0 := rows[0]
  70. restRows := rows[1:]
  71. for tagKey, tagVal := range row0.Tags {
  72. for _, rr := range restRows {
  73. resultTagVal := rr.Tags[tagKey]
  74. if tagVal != resultTagVal {
  75. diffTagKeys.Insert(tagKey)
  76. break
  77. }
  78. }
  79. }
  80. }
  81. for idx, row := range rows {
  82. col := ""
  83. columns := make([]string, 0)
  84. for _, column := range row.Columns {
  85. if column == "time" {
  86. continue
  87. }
  88. columns = append(columns, column)
  89. if col == "" {
  90. col = column
  91. continue
  92. }
  93. col = fmt.Sprintf("%s-%s", col, column)
  94. }
  95. columns = append(columns, "time")
  96. var points monitor.TimeSeriesPoints
  97. for _, valuePair := range row.Values {
  98. point, err := rp.parseTimepointV2(valuePair)
  99. if err == nil {
  100. points = append(points, point)
  101. } else {
  102. log.Errorf("rp.parseTimepointV2 error: %v", err)
  103. }
  104. }
  105. tags := make(map[string]string)
  106. for key, val := range row.Tags {
  107. val_ := strings.ReplaceAll(val, "+", " ")
  108. tags[key] = val_
  109. }
  110. name := rp.formatSerieName(row, col, query)
  111. ts := tsdb.NewTimeSeries(name, formatRawName(idx, name, query, tags, diffTagKeys), columns, points, tags)
  112. result = append(result, ts)
  113. }
  114. return result
  115. }
  116. func formatRawName(idx int, name string, query *Query, tags map[string]string, diffTagKeys sets.String) string {
  117. groupByTags := []string{}
  118. for _, group := range query.GroupBy {
  119. if group.Type == "tag" {
  120. groupByTags = append(groupByTags, group.Params[0])
  121. }
  122. }
  123. return tsdb.FormatRawName(idx, name, groupByTags, tags, diffTagKeys)
  124. }
  125. func (rp *ResponseParser) transformRowToTable(row Row, table *tsdb.Table) *tsdb.Table {
  126. for _, col := range row.Columns {
  127. table.Columns = append(table.Columns, tsdb.TableColumn{
  128. Text: col})
  129. }
  130. table.Rows = make([]tsdb.RowValues, len(row.Values))
  131. for _, value := range row.Values {
  132. rowvalue := tsdb.RowValues(value)
  133. table.Rows = append(table.Rows, rowvalue)
  134. }
  135. return table
  136. }
  137. func (rp *ResponseParser) formatSerieName(row Row, column string, query *Query) string {
  138. if query.Alias == "" {
  139. return rp.buildSerieNameFromQuery(row, column)
  140. }
  141. nameSegment := strings.Split(row.Name, ".")
  142. result := legendFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
  143. aliasFormat := string(in)
  144. aliasFormat = strings.Replace(aliasFormat, "[[", "", 1)
  145. aliasFormat = strings.Replace(aliasFormat, "]]", "", 1)
  146. aliasFormat = strings.Replace(aliasFormat, "$", "", 1)
  147. if aliasFormat == "m" || aliasFormat == "measurement" {
  148. return []byte(query.Measurement)
  149. }
  150. if aliasFormat == "col" {
  151. return []byte(column)
  152. }
  153. pos, err := strconv.Atoi(aliasFormat)
  154. if err == nil && len(nameSegment) >= pos {
  155. return []byte(nameSegment[pos])
  156. }
  157. if !strings.HasPrefix(aliasFormat, "tag_") {
  158. return in
  159. }
  160. tagKey := strings.Replace(aliasFormat, "tag_", "", 1)
  161. tagValue, exist := row.Tags[tagKey]
  162. if exist {
  163. return []byte(tagValue)
  164. }
  165. return in
  166. })
  167. return string(result)
  168. }
  169. func (rp *ResponseParser) buildSerieNameFromQuery(row Row, column string) string {
  170. /*var tags []string
  171. for k, v := range row.Tags {
  172. tags = append(tags, fmt.Sprintf("%s: %s", k, v))
  173. }
  174. tagText := ""
  175. if len(tags) > 0 {
  176. tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " "))
  177. }
  178. return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)*/
  179. return fmt.Sprintf("%s.%s", row.Name, column)
  180. }
  181. func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (monitor.TimePoint, error) {
  182. var value *float64 = rp.parseValue(valuePair[valuePosition])
  183. timestampNumber, _ := valuePair[0].(json.Number)
  184. timestamp, err := timestampNumber.Float64()
  185. if err != nil {
  186. return monitor.TimePoint{}, err
  187. }
  188. return monitor.NewTimePoint(value, timestamp), nil
  189. }
  190. func (rp *ResponseParser) parseTimepointV2(valuePair []interface{}) (monitor.TimePoint, error) {
  191. timepoint := make(monitor.TimePoint, 0)
  192. for i := 1; i < len(valuePair); i++ {
  193. timepoint = append(timepoint, rp.parseValueV2(valuePair[i]))
  194. }
  195. timestampNumber, _ := valuePair[0].(json.Number)
  196. timestamp, err := timestampNumber.Float64()
  197. if err != nil {
  198. return monitor.TimePoint{}, errors.Wrapf(err, "timestampNumber.Float64 of %#v", timestampNumber)
  199. }
  200. timepoint = append(timepoint, timestamp)
  201. return timepoint, nil
  202. }
  203. func (rp *ResponseParser) parseValue(value interface{}) *float64 {
  204. number, ok := value.(json.Number)
  205. if !ok {
  206. return nil
  207. }
  208. fvalue, err := number.Float64()
  209. if err == nil {
  210. return &fvalue
  211. }
  212. ivalue, err := number.Int64()
  213. if err == nil {
  214. ret := float64(ivalue)
  215. return &ret
  216. }
  217. return nil
  218. }
  219. func (rp *ResponseParser) parseValueV2(value interface{}) interface{} {
  220. number, ok := value.(json.Number)
  221. if !ok {
  222. return value
  223. }
  224. fvalue, err := number.Float64()
  225. if err == nil {
  226. return &fvalue
  227. }
  228. ivalue, err := number.Int64()
  229. if err == nil {
  230. ret := float64(ivalue)
  231. return &ret
  232. }
  233. return number.String()
  234. }