| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package common
- import (
- "context"
- "reflect"
- "strings"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/wait"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/informer"
- )
- type IResourceManager[O lockman.ILockedObject] interface {
- GetKeyword() string
- GetRefreshInterval() time.Duration
- GetStore() IResourceStore[O]
- GetResource(id string) (O, bool)
- SyncOnce() error
- Start(ctx context.Context)
- }
- type IResourceStore[O lockman.ILockedObject] interface {
- GetInformerResourceManager() informer.IResourceManager
- Init() error
- Get(id string) (O, bool)
- GetAll() []O
- GetByPrefix(prefixId string) []O
- Add(obj *jsonutils.JSONDict)
- Update(oldObj, newObj *jsonutils.JSONDict)
- Delete(obj *jsonutils.JSONDict)
- }
- type CommonResourceManager[O lockman.ILockedObject] struct {
- keyword string
- refreshInterval time.Duration
- store IResourceStore[O]
- }
- func NewCommonResourceManager[O lockman.ILockedObject](
- keyword string,
- refreshInterval time.Duration,
- store IResourceStore[O],
- ) *CommonResourceManager[O] {
- return &CommonResourceManager[O]{
- keyword: keyword,
- refreshInterval: refreshInterval,
- store: store,
- }
- }
- func (m *CommonResourceManager[O]) Start(ctx context.Context) {
- go func() {
- Start[O](ctx, m)
- }()
- }
- func (m *CommonResourceManager[O]) GetKeyword() string {
- return m.keyword
- }
- func (m CommonResourceManager[O]) GetStore() IResourceStore[O] {
- return m.store
- }
- func (m CommonResourceManager[O]) GetResource(id string) (O, bool) {
- return m.store.Get(id)
- }
- func (m CommonResourceManager[O]) GetAll() []O {
- return m.store.GetAll()
- }
- func (m *CommonResourceManager[O]) GetRefreshInterval() time.Duration {
- return m.refreshInterval
- }
- func (m *CommonResourceManager[O]) SyncOnce() error {
- return m.GetStore().Init()
- }
- type FGetDBObject func(man db.IModelManager, id string, obj *jsonutils.JSONDict) (db.IModel, error)
- type ResourceStore[O lockman.ILockedObject] struct {
- dataMap *sync.Map
- modelMan db.IModelManager
- res informer.IResourceManager
- getId func(O) string
- getWatchId func(*jsonutils.JSONDict) string
- getDBObject FGetDBObject
- onAdd func(obj db.IModel)
- onUpdate func(oldObj *jsonutils.JSONDict, newObj db.IModel)
- onDelete func(obj *jsonutils.JSONDict)
- }
- func NewResourceStore[O lockman.ILockedObject](
- modelMan db.IModelManager,
- res informer.IResourceManager,
- ) *ResourceStore[O] {
- return newResourceStore[O](modelMan, res, nil, nil, nil)
- }
- func NewJointResourceStore[O lockman.ILockedObject](
- modelMan db.IModelManager,
- res informer.IResourceManager,
- getId func(O) string,
- getWatchId func(*jsonutils.JSONDict) string,
- getDBObject FGetDBObject,
- ) *ResourceStore[O] {
- return newResourceStore(modelMan, res, getId, getWatchId, getDBObject)
- }
- func newResourceStore[O lockman.ILockedObject](
- modelMan db.IModelManager,
- res informer.IResourceManager,
- getId func(O) string,
- getWatchId func(*jsonutils.JSONDict) string,
- getDBObject FGetDBObject,
- ) *ResourceStore[O] {
- if getId == nil {
- getId = func(o O) string {
- return o.GetId()
- }
- }
- if getWatchId == nil {
- getWatchId = func(o *jsonutils.JSONDict) string {
- id, _ := o.GetString("id")
- return id
- }
- }
- if getDBObject == nil {
- getDBObject = func(man db.IModelManager, id string, o *jsonutils.JSONDict) (db.IModel, error) {
- return man.FetchById(id)
- }
- }
- return &ResourceStore[O]{
- dataMap: new(sync.Map),
- modelMan: modelMan,
- res: res,
- getId: getId,
- getWatchId: getWatchId,
- getDBObject: getDBObject,
- onAdd: nil,
- onUpdate: nil,
- onDelete: nil,
- }
- }
- func (s *ResourceStore[O]) WithOnAdd(onAdd func(db.IModel)) *ResourceStore[O] {
- s.onAdd = onAdd
- return s
- }
- func (s *ResourceStore[O]) WithOnUpdate(onUpdate func(old *jsonutils.JSONDict, newObj db.IModel)) *ResourceStore[O] {
- s.onUpdate = onUpdate
- return s
- }
- func (s *ResourceStore[O]) WithOnDelete(onDelete func(*jsonutils.JSONDict)) *ResourceStore[O] {
- s.onDelete = onDelete
- return s
- }
- func (s *ResourceStore[O]) GetInformerResourceManager() informer.IResourceManager {
- return s.res
- }
- func (s *ResourceStore[O]) Init() error {
- objs := make([]O, 0)
- q := s.modelMan.Query()
- if err := db.FetchModelObjects(s.modelMan, q, &objs); err != nil {
- return err
- }
- for _, obj := range objs {
- s.dataMap.Store(s.getId(obj), obj)
- }
- return nil
- }
- func (s *ResourceStore[O]) Get(id string) (O, bool) {
- obj, ok := s.dataMap.Load(id)
- if !ok {
- var ret O
- return ret, false
- }
- return obj.(O), true
- }
- func (s *ResourceStore[O]) GetByPrefix(prefixId string) []O {
- ret := make([]O, 0)
- s.dataMap.Range(func(key, value any) bool {
- if strings.HasPrefix(key.(string), prefixId) {
- ret = append(ret, value.(O))
- }
- return true
- })
- return ret
- }
- func (s *ResourceStore[O]) GetAll() []O {
- // To avoid repeated slice growth during append, count first and preallocate capacity
- count := 0
- s.dataMap.Range(func(key, value any) bool {
- count++
- return true
- })
- ret := make([]O, 0, count)
- s.dataMap.Range(func(key, value any) bool {
- ret = append(ret, value.(O))
- return true
- })
- return ret
- }
- func (s *ResourceStore[O]) Add(obj *jsonutils.JSONDict) {
- id := s.getWatchId(obj)
- if id != "" {
- dbObj, err := s.getDBObject(s.modelMan, id, obj)
- if err == nil {
- v := reflect.ValueOf(dbObj)
- tmpObj := v.Elem().Interface()
- s.dataMap.Store(id, tmpObj)
- log.Infof("Add %s %s", s.modelMan.Keyword(), obj.String())
- if s.onAdd != nil {
- s.onAdd(dbObj)
- }
- } else {
- log.Errorf("Fetch %s by id %s error when created: %v", s.modelMan.Keyword(), id, err)
- }
- }
- }
- func (s *ResourceStore[O]) removeIgnoreKeys(obj *jsonutils.JSONDict) *jsonutils.JSONDict {
- // ignore keys updated by cloudaccount
- for _, key := range []string{
- "probe_at",
- "update_version",
- "updated_at",
- } {
- obj.Remove(key)
- }
- return obj
- }
- func (s *ResourceStore[O]) Update(oldObj, newObj *jsonutils.JSONDict) {
- id := s.getWatchId(newObj)
- oldObj = s.removeIgnoreKeys(oldObj)
- newObj = s.removeIgnoreKeys(newObj)
- isEq := oldObj.String() == newObj.String()
- if id != "" && !isEq {
- dbObj, err := s.getDBObject(s.modelMan, id, newObj)
- if err == nil {
- v := reflect.ValueOf(dbObj)
- tmpObj := v.Elem().Interface()
- s.dataMap.Store(id, tmpObj)
- log.Infof("Update %s %s", s.modelMan.Keyword(), newObj.String())
- if s.onUpdate != nil {
- s.onUpdate(oldObj, dbObj)
- }
- } else {
- log.Errorf("Fetch %s by id %s error when updated: %v", s.modelMan.Keyword(), id, err)
- }
- }
- }
- func (s *ResourceStore[O]) Delete(obj *jsonutils.JSONDict) {
- id := s.getWatchId(obj)
- if id != "" {
- s.dataMap.Delete(id)
- log.Infof("Delete %s %s", s.modelMan.Keyword(), obj.String())
- if s.onDelete != nil {
- s.onDelete(obj)
- }
- }
- }
- func Start[O lockman.ILockedObject](ctx context.Context, resMan IResourceManager[O]) {
- startWatch(ctx, resMan)
- startSync(resMan)
- }
- func startWatch[O lockman.ILockedObject](ctx context.Context, resMan IResourceManager[O]) {
- s := auth.GetAdminSession(ctx, consts.GetRegion())
- informer.NewWatchManagerBySessionBg(s, func(man *informer.SWatchManager) error {
- res := resMan.GetStore().GetInformerResourceManager()
- if err := man.For(res).AddEventHandler(ctx, newEventHandler(res, resMan)); err != nil {
- return errors.Wrapf(err, "watch resource %s", res.KeyString())
- }
- return nil
- })
- }
- func startSync[O lockman.ILockedObject](resMan IResourceManager[O]) {
- wait.Forever(func() {
- log.Infof("%s data start sync", resMan.GetKeyword())
- startTime := time.Now()
- if err := syncOnce(resMan); err != nil {
- log.Errorf("%s sync data error: %v", resMan.GetKeyword(), err)
- return
- }
- log.Infof("%s finish sync, elapsed %s", resMan.GetKeyword(), time.Since(startTime))
- }, resMan.GetRefreshInterval())
- }
- func syncOnce[O lockman.ILockedObject](resMan IResourceManager[O]) error {
- if err := resMan.SyncOnce(); err != nil {
- return errors.Wrapf(err, "sync once of %s", resMan.GetKeyword())
- }
- return nil
- }
- type eventHandler[O lockman.ILockedObject] struct {
- resMan informer.IResourceManager
- dataMan IResourceManager[O]
- }
- func newEventHandler[O lockman.ILockedObject](resMan informer.IResourceManager, dataMan IResourceManager[O]) informer.EventHandler {
- return &eventHandler[O]{
- resMan: resMan,
- dataMan: dataMan,
- }
- }
- func (e eventHandler[O]) keyword() string {
- return e.resMan.GetKeyword()
- }
- func (e eventHandler[O]) store() IResourceStore[O] {
- return e.dataMan.GetStore()
- }
- func (e eventHandler[O]) OnAdd(obj *jsonutils.JSONDict) {
- log.Debugf("%s [CREATED]: \n%s", e.keyword(), obj.String())
- e.store().Add(obj)
- }
- func (e eventHandler[O]) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
- log.Debugf("%s [UPDATED]: \n[NEW]: %s\n[OLD]: %s", e.keyword(), newObj.String(), oldObj.String())
- e.store().Update(oldObj, newObj)
- }
- func (e eventHandler[O]) OnDelete(obj *jsonutils.JSONDict) {
- log.Debugf("%s [DELETED]: \n%s", e.keyword(), obj.String())
- e.store().Delete(obj)
- }
|