bigquery.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package google
  15. import (
  16. "fmt"
  17. "yunion.io/x/jsonutils"
  18. "yunion.io/x/pkg/errors"
  19. )
  20. func (self *SGoogleClient) bigqueryPost(resource string, params map[string]string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  21. resource = fmt.Sprintf("projects/%s/%s", self.projectId, resource)
  22. return jsonRequest(self.client, "POST", GOOGLE_BIGQUERY_DOMAIN, GOOGLE_BIGQUERY_API_VERSION, resource, params, body, self.debug)
  23. }
  24. func (self *SGoogleClient) bigqueryList(resource string, params map[string]string) (jsonutils.JSONObject, error) {
  25. return jsonRequest(self.client, "GET", GOOGLE_BIGQUERY_DOMAIN, GOOGLE_BIGQUERY_API_VERSION, resource, params, nil, self.debug)
  26. }
  27. func (self *SGoogleClient) bigqueryGet(resource string) (jsonutils.JSONObject, error) {
  28. return jsonRequest(self.client, "GET", GOOGLE_BIGQUERY_DOMAIN, GOOGLE_BIGQUERY_API_VERSION, resource, nil, nil, self.debug)
  29. }
  30. func (region *SRegion) BigQuery(sql string) ([]jsonutils.JSONObject, error) {
  31. req := struct {
  32. Kind string `json:"kind"`
  33. Query string `json:"query"`
  34. MaxResults string `json:"maxResults"`
  35. UseLegacySql bool `json:"useLegacySql"`
  36. }{
  37. Kind: "query",
  38. Query: sql,
  39. MaxResults: "1",
  40. UseLegacySql: false,
  41. }
  42. resp, err := region.client.bigqueryPost("queries", nil, jsonutils.Marshal(req))
  43. if err != nil {
  44. return nil, errors.Wrap(err, "bigqueryPost")
  45. }
  46. result := SBigQueryResult{}
  47. err = resp.Unmarshal(&result)
  48. if err != nil {
  49. return nil, errors.Wrap(err, "jsonutils.Unmarshal")
  50. }
  51. rows, err := result.GetRows()
  52. if err != nil {
  53. return nil, errors.Wrap(err, "GetRows")
  54. }
  55. return rows, nil
  56. }
  57. type SBigQueryField struct {
  58. Type string `json:"type"`
  59. Name string `json:"name"`
  60. Mode string `json:"mode"`
  61. Fields []SBigQueryField `json:"fields"`
  62. }
  63. type SBigQuerySchema struct {
  64. Fields []SBigQueryField `json:"fields"`
  65. }
  66. type SBigQueryJobReference struct {
  67. }
  68. type SBigQueryResult struct {
  69. CacheHit bool `json:"cacheHit"`
  70. JobComplete bool `json:"jobComplete"`
  71. JobReference SBigQueryJobReference `json:"jobReference"`
  72. Kind string `json:"kind"`
  73. Rows []jsonutils.JSONObject `json:"rows"`
  74. Schema SBigQuerySchema `json:"schema"`
  75. TotalBytesProcessed int64 `json:"totalBytesProcessed"`
  76. TotalRows int64 `json:"totalRows"`
  77. }
  78. func (res SBigQueryResult) GetRows() ([]jsonutils.JSONObject, error) {
  79. rows := make([]jsonutils.JSONObject, 0)
  80. for _, r := range res.Rows {
  81. row, err := res.Schema.Parse(r)
  82. if err != nil {
  83. return nil, errors.Wrap(err, "Schema.Parse")
  84. }
  85. rows = append(rows, jsonutils.Marshal(row))
  86. }
  87. return rows, nil
  88. }
  89. func (schema SBigQuerySchema) Parse(r jsonutils.JSONObject) (map[string]jsonutils.JSONObject, error) {
  90. f := SBigQueryField{
  91. Type: "RECORD",
  92. Mode: "NULLABLE",
  93. Name: "",
  94. Fields: schema.Fields,
  95. }
  96. nr := jsonutils.NewDict()
  97. nr.Add(r, "v")
  98. return f.Parse(nr, "")
  99. }
  100. func (f SBigQueryField) Parse(r jsonutils.JSONObject, prefix string) (map[string]jsonutils.JSONObject, error) {
  101. if len(prefix) > 0 {
  102. prefix = prefix + "." + f.Name
  103. } else {
  104. prefix = f.Name
  105. }
  106. ret := make(map[string]jsonutils.JSONObject)
  107. switch f.Type {
  108. case "RECORD":
  109. if f.Mode == "REPEATED" {
  110. items, err := r.GetArray("v")
  111. if err != nil {
  112. return nil, errors.Wrapf(err, "GetArray v %s", r)
  113. }
  114. nf := f
  115. nf.Mode = "NULLABLE"
  116. val := jsonutils.NewArray()
  117. for _, item := range items {
  118. obj, err := nf.Parse(item, "")
  119. if err != nil {
  120. return nil, errors.Wrap(err, "Parse items")
  121. }
  122. val.Add(jsonutils.Marshal(obj))
  123. }
  124. ret[prefix] = val
  125. } else {
  126. items, err := r.GetArray("v", "f")
  127. if err != nil {
  128. return nil, errors.Wrap(err, "GetArray v.f")
  129. }
  130. if len(items) != len(f.Fields) {
  131. return nil, errors.Wrap(errors.ErrServer, "inconsistent items and fields")
  132. }
  133. for i := range f.Fields {
  134. res, err := f.Fields[i].Parse(items[i], prefix)
  135. if err != nil {
  136. return nil, errors.Wrapf(err, "Parse %s", prefix)
  137. }
  138. for k, v := range res {
  139. ret[k] = v
  140. }
  141. }
  142. }
  143. default:
  144. v, err := r.Get("v")
  145. if err != nil {
  146. return nil, errors.Wrap(err, "Get v")
  147. }
  148. ret[prefix] = v
  149. }
  150. return ret, nil
  151. }