reflect.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package apihelper
  15. import (
  16. "fmt"
  17. "reflect"
  18. "time"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/errors"
  22. "yunion.io/x/pkg/util/printutils"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  24. mcclient "yunion.io/x/onecloud/pkg/mcclient"
  25. mcclient_modulebase "yunion.io/x/onecloud/pkg/mcclient/modulebase"
  26. "yunion.io/x/onecloud/pkg/mcclient/options"
  27. )
  28. // A hack to workaround the IsZero() in timeutils.Utcify. This depends on the
  29. // fact that database time has a resolution of 1-second
  30. var PseudoZeroTime = time.Time{}.Add(time.Nanosecond)
  31. type GetModelsOptions struct {
  32. ClientSession *mcclient.ClientSession
  33. ModelManager mcclient_modulebase.IBaseManager
  34. ModelSet IModelSet
  35. BatchListSize int
  36. IncludeDetails bool
  37. IncludeEmulated bool
  38. InCludeOtherCloudEnv bool
  39. }
  40. func GetModels(opts *GetModelsOptions) error {
  41. man := opts.ModelManager
  42. manKeyPlural := man.KeyString()
  43. minUpdatedAt := PseudoZeroTime
  44. setNextListParams := func(params *jsonutils.JSONDict, lastResult *printutils.ListResult) error {
  45. // NOTE: the updated_at field has second-level resolution.
  46. // If they all have the same date...
  47. params.Set("offset", jsonutils.NewInt(int64(lastResult.Offset+len(lastResult.Data))))
  48. return nil
  49. }
  50. listOptions := options.BaseListOptions{
  51. System: options.Bool(true),
  52. Admin: options.Bool(true),
  53. Scope: "system",
  54. Details: options.Bool(opts.IncludeDetails),
  55. ShowEmulated: options.Bool(opts.IncludeEmulated),
  56. Filter: []string{},
  57. OrderBy: []string{"updated_at", "created_at", "id"},
  58. Order: "asc",
  59. Limit: options.Int(opts.BatchListSize),
  60. Offset: options.Int(0),
  61. }
  62. if !opts.InCludeOtherCloudEnv {
  63. listOptions.Filter = append(listOptions.Filter,
  64. "manager_id.isnullorempty()", // len(manager_id) > 0 is for pubcloud objects
  65. // "external_id.isnullorempty()", // len(external_id) > 0 is for pubcloud objects
  66. )
  67. listOptions.CloudEnv = "onpremise"
  68. // listOptions.Provider = []string{"OneCloud"}
  69. }
  70. if inter, ok := opts.ModelSet.(IModelSetFilter); ok {
  71. filter := inter.ModelFilter()
  72. listOptions.Filter = append(listOptions.Filter, filter...)
  73. }
  74. if !minUpdatedAt.Equal(PseudoZeroTime) {
  75. // Only fetching pending deletes when we are doing incremental fetch
  76. listOptions.PendingDeleteAll = options.Bool(true)
  77. listOptions.DeleteAll = options.Bool(true)
  78. }
  79. params, err := listOptions.Params()
  80. if err != nil {
  81. return fmt.Errorf("%s: making list params: %s", manKeyPlural, err)
  82. }
  83. if inter, ok := opts.ModelSet.(IModelListParam); ok {
  84. filter := inter.ModelParamFilter()
  85. params.Update(filter)
  86. }
  87. if inter, ok := opts.ModelSet.(IModelListSetParams); ok {
  88. params = inter.SetModelListParams(params)
  89. }
  90. //XXX
  91. //params.Set(api.LBAGENT_QUERY_ORIG_KEY, jsonutils.NewString(api.LBAGENT_QUERY_ORIG_VAL))
  92. entriesJson := []jsonutils.JSONObject{}
  93. for {
  94. var err error
  95. listResult, err := opts.ModelManager.List(opts.ClientSession, params)
  96. if err != nil {
  97. log.Errorf("%s: list failed with updated_at.gt('%s'): %s", manKeyPlural, minUpdatedAt, err)
  98. return errors.Wrapf(err, "%s list failed with params: %s", manKeyPlural, params.QueryString())
  99. }
  100. entriesJson = append(entriesJson, listResult.Data...)
  101. if listResult.Offset+len(listResult.Data) >= listResult.Total {
  102. break
  103. }
  104. err = setNextListParams(params, listResult)
  105. if err != nil {
  106. return fmt.Errorf("%s: %s", manKeyPlural, err)
  107. }
  108. }
  109. {
  110. err := InitializeModelSetFromJSON(opts.ModelSet, entriesJson)
  111. if err != nil {
  112. return fmt.Errorf("%s: initializing model set failed: %s",
  113. manKeyPlural, err)
  114. }
  115. }
  116. return nil
  117. }
  118. func InitializeModelSetFromJSON(set IModelSet, entriesJson []jsonutils.JSONObject) error {
  119. setRv := reflect.ValueOf(set)
  120. for _, kRv := range setRv.MapKeys() {
  121. zRv := reflect.Value{}
  122. setRv.SetMapIndex(kRv, zRv)
  123. }
  124. manKeyPlural := set.ModelManager().KeyString()
  125. for _, entryJson := range entriesJson {
  126. m := set.NewModel()
  127. err := entryJson.Unmarshal(m)
  128. if err != nil {
  129. return fmt.Errorf("%s: unmarshal: %v: %s", manKeyPlural, err, entryJson.String())
  130. }
  131. {
  132. keyRv := reflect.ValueOf(m.GetId())
  133. oldMRv := setRv.MapIndex(keyRv)
  134. if oldMRv.IsValid() {
  135. // check version
  136. oldM := oldMRv.Interface().(db.IModel)
  137. oldVersion := oldM.GetUpdateVersion()
  138. version := m.GetUpdateVersion()
  139. if oldVersion > version {
  140. oldUpdatedAt := oldM.GetUpdatedAt()
  141. updatedAt := m.GetUpdatedAt()
  142. log.Warningf("prefer loadbalancer with update_version %d(%s) to %d(%s)",
  143. oldVersion, oldUpdatedAt, version, updatedAt)
  144. return nil
  145. }
  146. }
  147. }
  148. set.AddModel(m)
  149. }
  150. return nil
  151. }
  152. func ModelSetMaxUpdatedAt(set IModelSet) time.Time {
  153. r := PseudoZeroTime
  154. setRv := reflect.ValueOf(set)
  155. for _, kRv := range setRv.MapKeys() {
  156. mRv := setRv.MapIndex(kRv)
  157. m := mRv.Interface().(db.IModel)
  158. updatedAt := m.GetUpdatedAt()
  159. if r.Before(updatedAt) {
  160. r = updatedAt
  161. }
  162. }
  163. return r
  164. }
  165. type ModelSetUpdateResult struct {
  166. Changed bool
  167. MaxUpdatedAt time.Time
  168. }
  169. // ModelSetApplyUpdates applies bSet to aSet.
  170. //
  171. // - PendingDeleted in bSet are removed from aSet
  172. // - Newer models in bSet are updated in aSet
  173. func ModelSetApplyUpdates(aSet, bSet IModelSet) *ModelSetUpdateResult {
  174. r := &ModelSetUpdateResult{
  175. Changed: false,
  176. }
  177. {
  178. a := ModelSetMaxUpdatedAt(aSet)
  179. b := ModelSetMaxUpdatedAt(bSet)
  180. if b.After(a) {
  181. r.MaxUpdatedAt = b
  182. } else {
  183. r.MaxUpdatedAt = a
  184. }
  185. }
  186. aSetRv := reflect.ValueOf(aSet)
  187. bSetRv := reflect.ValueOf(bSet)
  188. for _, kRv := range bSetRv.MapKeys() {
  189. bMRv := bSetRv.MapIndex(kRv)
  190. b := bMRv.Interface()
  191. bM := b.(db.IModel)
  192. bGone := bM.GetDeleted()
  193. if !bGone {
  194. bVM, ok := b.(db.IPendingDeletable)
  195. if ok {
  196. bGone = bVM.GetPendingDeleted()
  197. }
  198. }
  199. aMRv := aSetRv.MapIndex(kRv)
  200. if aMRv.IsValid() {
  201. aM := aMRv.Interface().(db.IModel)
  202. if bGone {
  203. // oops, deleted
  204. aSetRv.SetMapIndex(kRv, reflect.Value{})
  205. r.Changed = true
  206. continue
  207. }
  208. if aM.GetUpdateVersion() < bM.GetUpdateVersion() {
  209. // oops, updated
  210. aSetRv.SetMapIndex(kRv, bMRv)
  211. r.Changed = true
  212. continue
  213. }
  214. } else {
  215. if bGone {
  216. // hmm, gone before even knowning
  217. continue
  218. }
  219. // oops, new member
  220. aSetRv.SetMapIndex(kRv, bMRv)
  221. r.Changed = true
  222. }
  223. }
  224. for _, kRv := range aSetRv.MapKeys() {
  225. bMRv := bSetRv.MapIndex(kRv)
  226. if !bMRv.IsValid() { // alread deleted
  227. aSetRv.SetMapIndex(kRv, reflect.Value{})
  228. r.Changed = true
  229. }
  230. }
  231. return r
  232. }