client.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package victoriametrics
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "time"
  24. "github.com/influxdata/influxql"
  25. "golang.org/x/net/context/ctxhttp"
  26. "moul.io/http2curl/v2"
  27. "yunion.io/x/log"
  28. "yunion.io/x/pkg/errors"
  29. "yunion.io/x/pkg/util/httputils"
  30. )
  31. const (
  32. ErrVMInvalidResponse = errors.Error("VictoriaMetrics invalid response")
  33. )
  34. type TimeRange struct {
  35. Start int64
  36. End int64
  37. }
  38. func NewTimeRange(start, end int64) *TimeRange {
  39. return &TimeRange{
  40. Start: start,
  41. End: end,
  42. }
  43. }
  44. func NewTimeRangeByInfluxTimeRange(tr *influxql.TimeRange) *TimeRange {
  45. // format should be: https://docs.victoriametrics.com/#timestamp-formats
  46. nTr := &TimeRange{}
  47. if !tr.MinTime().IsZero() {
  48. nTr.Start = tr.MinTime().Unix()
  49. }
  50. if !tr.MaxTime().IsZero() {
  51. nTr.End = tr.MaxTime().Unix()
  52. }
  53. return nTr
  54. }
  55. type Client interface {
  56. QueryRange(ctx context.Context, httpCli *http.Client, query string, step time.Duration, timeRange *TimeRange, disableCache bool) (*Response, error)
  57. RawQuery(ctx context.Context, httpCli *http.Client, query string, disableCache bool) (*Response, error)
  58. }
  59. type client struct {
  60. endpoint string
  61. endpointURL url.URL
  62. }
  63. func (c *client) getAPIURL(reqPath string) string {
  64. apiPrefix := "/api/v1"
  65. reqPath = fmt.Sprintf("%s/%s", apiPrefix, reqPath)
  66. reqURL := c.endpointURL
  67. reqURL.Path = path.Join(reqURL.Path, reqPath)
  68. return reqURL.String()
  69. }
  70. // ResponseDataResultValue likes: [ 1652169600, "1" ]
  71. type ResponseDataResultValue []interface{}
  72. type ResponseDataResult struct {
  73. Metric map[string]string `json:"metric"`
  74. Values []ResponseDataResultValue `json:"values"`
  75. // sum(histogram_over_time(cpu_usage_active[24h])) by (vmrange) 查询时会返回
  76. Value ResponseDataResultValue `json:"value"`
  77. }
  78. type ResponseData struct {
  79. ResultType string `json:"resultType"`
  80. Result []ResponseDataResult `json:"result"`
  81. }
  82. type ResponseStats struct {
  83. // SeriesFetched is like integer type: {seriesFetched: "2"}
  84. SeriesFetched string `json:"seriesFetched"`
  85. }
  86. type Response struct {
  87. Status string `json:"status"`
  88. Data ResponseData
  89. Stats ResponseStats
  90. }
  91. // QueryRange implements Client.
  92. func (c *client) QueryRange(ctx context.Context, httpCli *http.Client, query string, step time.Duration, tr *TimeRange, disableCache bool) (*Response, error) {
  93. req, err := c.createQueryRangeReq(query, step, tr, disableCache)
  94. if err != nil {
  95. return nil, errors.Wrap(err, "get request")
  96. }
  97. resp, err := ctxhttp.Do(ctx, httpCli, req)
  98. if err != nil {
  99. return nil, errors.Wrap(err, "Do request")
  100. }
  101. defer httputils.CloseResponse(resp)
  102. if resp.StatusCode/100 != 2 {
  103. var msg string
  104. if resp.Body != nil {
  105. errMsg, err := io.ReadAll(resp.Body)
  106. if err != nil {
  107. log.Errorf("request error, io.ReadAll error %s", err)
  108. } else {
  109. msg = string(errMsg)
  110. log.Errorf("ctxhttp.Do fail with status %d message %s", resp.StatusCode, errMsg)
  111. }
  112. }
  113. return nil, errors.Wrapf(ErrVMInvalidResponse, "status code: %d (%s)", resp.StatusCode, msg)
  114. }
  115. var response Response
  116. dec := json.NewDecoder(resp.Body)
  117. dec.UseNumber()
  118. if err := dec.Decode(&response); err != nil {
  119. return nil, errors.Wrap(err, "decode json response")
  120. }
  121. return &response, nil
  122. }
  123. func (c *client) createQueryRangeReq(query string, step time.Duration, tr *TimeRange, disableCache bool) (*http.Request, error) {
  124. reqURL := c.getAPIURL("/query_range")
  125. req, err := http.NewRequest(http.MethodGet, reqURL, nil)
  126. if err != nil {
  127. return nil, errors.Wrapf(err, "new HTTP request of: %s", reqURL)
  128. }
  129. req.Header.Set("User-Agent", "Cloudpods Monitor Service")
  130. params := req.URL.Query()
  131. params.Set("query", query)
  132. if step != 0 {
  133. params.Set("step", step.String())
  134. }
  135. if tr != nil {
  136. if tr.Start != 0 {
  137. params.Set("start", fmt.Sprintf("%d", tr.Start))
  138. }
  139. if tr.End != 0 {
  140. params.Set("end", fmt.Sprintf("%d", tr.End))
  141. }
  142. }
  143. if disableCache {
  144. params.Set("nocache", "1")
  145. }
  146. req.URL.RawQuery = params.Encode()
  147. curlCmd, _ := http2curl.GetCurlCommand(req)
  148. log.Infof("VictoriaMetrics curl cmd: %s", curlCmd)
  149. return req, nil
  150. }
  151. func (c *client) RawQuery(ctx context.Context, httpCli *http.Client, query string, disableCache bool) (*Response, error) {
  152. reqURL := c.getAPIURL("/query")
  153. req, err := http.NewRequest(http.MethodGet, reqURL, nil)
  154. if err != nil {
  155. return nil, errors.Wrapf(err, "new HTTP request of: %s", reqURL)
  156. }
  157. req.Header.Set("User-Agent", "Cloudpods Monitor Service")
  158. params := req.URL.Query()
  159. params.Set("query", query)
  160. if disableCache {
  161. params.Set("nocache", "1")
  162. }
  163. req.URL.RawQuery = params.Encode()
  164. curlCmd, _ := http2curl.GetCurlCommand(req)
  165. log.Infof("VictoriaMetrics curl cmd: %s", curlCmd)
  166. resp, err := ctxhttp.Do(ctx, httpCli, req)
  167. if err != nil {
  168. return nil, errors.Wrap(err, "Do request")
  169. }
  170. defer resp.Body.Close()
  171. if resp.StatusCode/100 != 2 {
  172. return nil, errors.Wrapf(ErrVMInvalidResponse, "status code: %d", resp.StatusCode)
  173. }
  174. var response Response
  175. dec := json.NewDecoder(resp.Body)
  176. dec.UseNumber()
  177. if err := dec.Decode(&response); err != nil {
  178. return nil, errors.Wrap(err, "decode json response")
  179. }
  180. return &response, nil
  181. }
  182. func NewClient(endpoint string) (Client, error) {
  183. u, err := url.Parse(endpoint)
  184. if err != nil {
  185. return nil, errors.Wrapf(err, "invalid url: %q", endpoint)
  186. }
  187. cli := &client{
  188. endpoint: endpoint,
  189. endpointURL: *u,
  190. }
  191. return cli, nil
  192. }