common.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package common
  15. import (
  16. "context"
  17. "reflect"
  18. "strings"
  19. "sync"
  20. "time"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/wait"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  28. "yunion.io/x/onecloud/pkg/mcclient/auth"
  29. "yunion.io/x/onecloud/pkg/mcclient/informer"
  30. )
  31. type IResourceManager[O lockman.ILockedObject] interface {
  32. GetKeyword() string
  33. GetRefreshInterval() time.Duration
  34. GetStore() IResourceStore[O]
  35. GetResource(id string) (O, bool)
  36. SyncOnce() error
  37. Start(ctx context.Context)
  38. }
  39. type IResourceStore[O lockman.ILockedObject] interface {
  40. GetInformerResourceManager() informer.IResourceManager
  41. Init() error
  42. Get(id string) (O, bool)
  43. GetAll() []O
  44. GetByPrefix(prefixId string) []O
  45. Add(obj *jsonutils.JSONDict)
  46. Update(oldObj, newObj *jsonutils.JSONDict)
  47. Delete(obj *jsonutils.JSONDict)
  48. }
  49. type CommonResourceManager[O lockman.ILockedObject] struct {
  50. keyword string
  51. refreshInterval time.Duration
  52. store IResourceStore[O]
  53. }
  54. func NewCommonResourceManager[O lockman.ILockedObject](
  55. keyword string,
  56. refreshInterval time.Duration,
  57. store IResourceStore[O],
  58. ) *CommonResourceManager[O] {
  59. return &CommonResourceManager[O]{
  60. keyword: keyword,
  61. refreshInterval: refreshInterval,
  62. store: store,
  63. }
  64. }
  65. func (m *CommonResourceManager[O]) Start(ctx context.Context) {
  66. go func() {
  67. Start[O](ctx, m)
  68. }()
  69. }
  70. func (m *CommonResourceManager[O]) GetKeyword() string {
  71. return m.keyword
  72. }
  73. func (m CommonResourceManager[O]) GetStore() IResourceStore[O] {
  74. return m.store
  75. }
  76. func (m CommonResourceManager[O]) GetResource(id string) (O, bool) {
  77. return m.store.Get(id)
  78. }
  79. func (m CommonResourceManager[O]) GetAll() []O {
  80. return m.store.GetAll()
  81. }
  82. func (m *CommonResourceManager[O]) GetRefreshInterval() time.Duration {
  83. return m.refreshInterval
  84. }
  85. func (m *CommonResourceManager[O]) SyncOnce() error {
  86. return m.GetStore().Init()
  87. }
  88. type FGetDBObject func(man db.IModelManager, id string, obj *jsonutils.JSONDict) (db.IModel, error)
  89. type ResourceStore[O lockman.ILockedObject] struct {
  90. dataMap *sync.Map
  91. modelMan db.IModelManager
  92. res informer.IResourceManager
  93. getId func(O) string
  94. getWatchId func(*jsonutils.JSONDict) string
  95. getDBObject FGetDBObject
  96. onAdd func(obj db.IModel)
  97. onUpdate func(oldObj *jsonutils.JSONDict, newObj db.IModel)
  98. onDelete func(obj *jsonutils.JSONDict)
  99. }
  100. func NewResourceStore[O lockman.ILockedObject](
  101. modelMan db.IModelManager,
  102. res informer.IResourceManager,
  103. ) *ResourceStore[O] {
  104. return newResourceStore[O](modelMan, res, nil, nil, nil)
  105. }
  106. func NewJointResourceStore[O lockman.ILockedObject](
  107. modelMan db.IModelManager,
  108. res informer.IResourceManager,
  109. getId func(O) string,
  110. getWatchId func(*jsonutils.JSONDict) string,
  111. getDBObject FGetDBObject,
  112. ) *ResourceStore[O] {
  113. return newResourceStore(modelMan, res, getId, getWatchId, getDBObject)
  114. }
  115. func newResourceStore[O lockman.ILockedObject](
  116. modelMan db.IModelManager,
  117. res informer.IResourceManager,
  118. getId func(O) string,
  119. getWatchId func(*jsonutils.JSONDict) string,
  120. getDBObject FGetDBObject,
  121. ) *ResourceStore[O] {
  122. if getId == nil {
  123. getId = func(o O) string {
  124. return o.GetId()
  125. }
  126. }
  127. if getWatchId == nil {
  128. getWatchId = func(o *jsonutils.JSONDict) string {
  129. id, _ := o.GetString("id")
  130. return id
  131. }
  132. }
  133. if getDBObject == nil {
  134. getDBObject = func(man db.IModelManager, id string, o *jsonutils.JSONDict) (db.IModel, error) {
  135. return man.FetchById(id)
  136. }
  137. }
  138. return &ResourceStore[O]{
  139. dataMap: new(sync.Map),
  140. modelMan: modelMan,
  141. res: res,
  142. getId: getId,
  143. getWatchId: getWatchId,
  144. getDBObject: getDBObject,
  145. onAdd: nil,
  146. onUpdate: nil,
  147. onDelete: nil,
  148. }
  149. }
  150. func (s *ResourceStore[O]) WithOnAdd(onAdd func(db.IModel)) *ResourceStore[O] {
  151. s.onAdd = onAdd
  152. return s
  153. }
  154. func (s *ResourceStore[O]) WithOnUpdate(onUpdate func(old *jsonutils.JSONDict, newObj db.IModel)) *ResourceStore[O] {
  155. s.onUpdate = onUpdate
  156. return s
  157. }
  158. func (s *ResourceStore[O]) WithOnDelete(onDelete func(*jsonutils.JSONDict)) *ResourceStore[O] {
  159. s.onDelete = onDelete
  160. return s
  161. }
  162. func (s *ResourceStore[O]) GetInformerResourceManager() informer.IResourceManager {
  163. return s.res
  164. }
  165. func (s *ResourceStore[O]) Init() error {
  166. objs := make([]O, 0)
  167. q := s.modelMan.Query()
  168. if err := db.FetchModelObjects(s.modelMan, q, &objs); err != nil {
  169. return err
  170. }
  171. for _, obj := range objs {
  172. s.dataMap.Store(s.getId(obj), obj)
  173. }
  174. return nil
  175. }
  176. func (s *ResourceStore[O]) Get(id string) (O, bool) {
  177. obj, ok := s.dataMap.Load(id)
  178. if !ok {
  179. var ret O
  180. return ret, false
  181. }
  182. return obj.(O), true
  183. }
  184. func (s *ResourceStore[O]) GetByPrefix(prefixId string) []O {
  185. ret := make([]O, 0)
  186. s.dataMap.Range(func(key, value any) bool {
  187. if strings.HasPrefix(key.(string), prefixId) {
  188. ret = append(ret, value.(O))
  189. }
  190. return true
  191. })
  192. return ret
  193. }
  194. func (s *ResourceStore[O]) GetAll() []O {
  195. // To avoid repeated slice growth during append, count first and preallocate capacity
  196. count := 0
  197. s.dataMap.Range(func(key, value any) bool {
  198. count++
  199. return true
  200. })
  201. ret := make([]O, 0, count)
  202. s.dataMap.Range(func(key, value any) bool {
  203. ret = append(ret, value.(O))
  204. return true
  205. })
  206. return ret
  207. }
  208. func (s *ResourceStore[O]) Add(obj *jsonutils.JSONDict) {
  209. id := s.getWatchId(obj)
  210. if id != "" {
  211. dbObj, err := s.getDBObject(s.modelMan, id, obj)
  212. if err == nil {
  213. v := reflect.ValueOf(dbObj)
  214. tmpObj := v.Elem().Interface()
  215. s.dataMap.Store(id, tmpObj)
  216. log.Infof("Add %s %s", s.modelMan.Keyword(), obj.String())
  217. if s.onAdd != nil {
  218. s.onAdd(dbObj)
  219. }
  220. } else {
  221. log.Errorf("Fetch %s by id %s error when created: %v", s.modelMan.Keyword(), id, err)
  222. }
  223. }
  224. }
  225. func (s *ResourceStore[O]) removeIgnoreKeys(obj *jsonutils.JSONDict) *jsonutils.JSONDict {
  226. // ignore keys updated by cloudaccount
  227. for _, key := range []string{
  228. "probe_at",
  229. "update_version",
  230. "updated_at",
  231. } {
  232. obj.Remove(key)
  233. }
  234. return obj
  235. }
  236. func (s *ResourceStore[O]) Update(oldObj, newObj *jsonutils.JSONDict) {
  237. id := s.getWatchId(newObj)
  238. oldObj = s.removeIgnoreKeys(oldObj)
  239. newObj = s.removeIgnoreKeys(newObj)
  240. isEq := oldObj.String() == newObj.String()
  241. if id != "" && !isEq {
  242. dbObj, err := s.getDBObject(s.modelMan, id, newObj)
  243. if err == nil {
  244. v := reflect.ValueOf(dbObj)
  245. tmpObj := v.Elem().Interface()
  246. s.dataMap.Store(id, tmpObj)
  247. log.Infof("Update %s %s", s.modelMan.Keyword(), newObj.String())
  248. if s.onUpdate != nil {
  249. s.onUpdate(oldObj, dbObj)
  250. }
  251. } else {
  252. log.Errorf("Fetch %s by id %s error when updated: %v", s.modelMan.Keyword(), id, err)
  253. }
  254. }
  255. }
  256. func (s *ResourceStore[O]) Delete(obj *jsonutils.JSONDict) {
  257. id := s.getWatchId(obj)
  258. if id != "" {
  259. s.dataMap.Delete(id)
  260. log.Infof("Delete %s %s", s.modelMan.Keyword(), obj.String())
  261. if s.onDelete != nil {
  262. s.onDelete(obj)
  263. }
  264. }
  265. }
  266. func Start[O lockman.ILockedObject](ctx context.Context, resMan IResourceManager[O]) {
  267. startWatch(ctx, resMan)
  268. startSync(resMan)
  269. }
  270. func startWatch[O lockman.ILockedObject](ctx context.Context, resMan IResourceManager[O]) {
  271. s := auth.GetAdminSession(ctx, consts.GetRegion())
  272. informer.NewWatchManagerBySessionBg(s, func(man *informer.SWatchManager) error {
  273. res := resMan.GetStore().GetInformerResourceManager()
  274. if err := man.For(res).AddEventHandler(ctx, newEventHandler(res, resMan)); err != nil {
  275. return errors.Wrapf(err, "watch resource %s", res.KeyString())
  276. }
  277. return nil
  278. })
  279. }
  280. func startSync[O lockman.ILockedObject](resMan IResourceManager[O]) {
  281. wait.Forever(func() {
  282. log.Infof("%s data start sync", resMan.GetKeyword())
  283. startTime := time.Now()
  284. if err := syncOnce(resMan); err != nil {
  285. log.Errorf("%s sync data error: %v", resMan.GetKeyword(), err)
  286. return
  287. }
  288. log.Infof("%s finish sync, elapsed %s", resMan.GetKeyword(), time.Since(startTime))
  289. }, resMan.GetRefreshInterval())
  290. }
  291. func syncOnce[O lockman.ILockedObject](resMan IResourceManager[O]) error {
  292. if err := resMan.SyncOnce(); err != nil {
  293. return errors.Wrapf(err, "sync once of %s", resMan.GetKeyword())
  294. }
  295. return nil
  296. }
  297. type eventHandler[O lockman.ILockedObject] struct {
  298. resMan informer.IResourceManager
  299. dataMan IResourceManager[O]
  300. }
  301. func newEventHandler[O lockman.ILockedObject](resMan informer.IResourceManager, dataMan IResourceManager[O]) informer.EventHandler {
  302. return &eventHandler[O]{
  303. resMan: resMan,
  304. dataMan: dataMan,
  305. }
  306. }
  307. func (e eventHandler[O]) keyword() string {
  308. return e.resMan.GetKeyword()
  309. }
  310. func (e eventHandler[O]) store() IResourceStore[O] {
  311. return e.dataMan.GetStore()
  312. }
  313. func (e eventHandler[O]) OnAdd(obj *jsonutils.JSONDict) {
  314. log.Debugf("%s [CREATED]: \n%s", e.keyword(), obj.String())
  315. e.store().Add(obj)
  316. }
  317. func (e eventHandler[O]) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
  318. log.Debugf("%s [UPDATED]: \n[NEW]: %s\n[OLD]: %s", e.keyword(), newObj.String(), oldObj.String())
  319. e.store().Update(oldObj, newObj)
  320. }
  321. func (e eventHandler[O]) OnDelete(obj *jsonutils.JSONDict) {
  322. log.Debugf("%s [DELETED]: \n%s", e.keyword(), obj.String())
  323. e.store().Delete(obj)
  324. }