| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package sku
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/wait"
- "yunion.io/x/sqlchemy"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/compute/models"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/informer"
- "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- )
- var (
- skuManager *SSkuManager
- )
- func Start(ctx context.Context, refreshInterval time.Duration) {
- skuManager = &SSkuManager{
- skuMap: newSkuMap(),
- refreshInterval: refreshInterval,
- }
- skuManager.startWatch(ctx)
- skuManager.sync()
- }
- func (m *SSkuManager) startWatch(ctx context.Context) {
- s := auth.GetAdminSession(ctx, consts.GetRegion())
- informer.NewWatchManagerBySessionBg(s, func(man *informer.SWatchManager) error {
- if err := man.For(compute.ServerSkus).AddEventHandler(ctx, newEventHandler(compute.ServerSkus, m)); err != nil {
- return errors.Wrapf(err, "watch resource %s", compute.ServerSkus.KeyString())
- }
- return nil
- })
- }
- func newEventHandler(res informer.IResourceManager, dataMan *SSkuManager) informer.EventHandler {
- return &eventHandler{
- res: res,
- dataMan: dataMan,
- }
- }
- type eventHandler struct {
- res informer.IResourceManager
- dataMan *SSkuManager
- }
- func (e eventHandler) keyword() string {
- return e.res.GetKeyword()
- }
- func (e eventHandler) newServerSkuFromJson(obj *jsonutils.JSONDict) (*models.SServerSku, error) {
- sku := &models.SServerSku{}
- if err := obj.Unmarshal(sku); err != nil {
- return nil, errors.Wrapf(err, "unmarshal server sku by: %s", obj.String())
- }
- return sku, nil
- }
- func (e eventHandler) newServerSku(obj *jsonutils.JSONDict) (*models.SServerSku, error) {
- sku, err := e.newServerSkuFromJson(obj)
- if err != nil {
- return nil, errors.Wrap(err, "newServerSkuFromJson")
- }
- if sku.PostpaidStatus == "" {
- obj, err := models.ServerSkuManager.FetchById(sku.Id)
- if err != nil {
- return nil, errors.Wrapf(err, "fetch serversku by id %q", sku.Id)
- }
- sku = obj.(*models.SServerSku)
- }
- return sku, nil
- }
- func isValidServerSku(sku *models.SServerSku) error {
- if sku.PrepaidStatus != computeapi.SkuStatusAvailable && sku.PostpaidStatus != computeapi.SkuStatusAvailable {
- return errors.Wrapf(errors.ErrInvalidStatus, "sku: %s, prepaid_status: %q, postpaid_status: %q", sku.Name, sku.PrepaidStatus, sku.PostpaidStatus)
- }
- if !sku.Enabled.IsTrue() {
- return errors.Wrapf(errors.ErrInvalidStatus, "sku: %s, enabled: %q", sku.Name, sku.Enabled)
- }
- return nil
- }
- func newServerSku(sku *models.SServerSku) *ServerSku {
- return &ServerSku{
- Id: sku.Id,
- Name: sku.Name,
- RegionId: sku.CloudregionId,
- ZoneId: sku.ZoneId,
- Provider: sku.Provider,
- }
- }
- func (e eventHandler) addServerSku(obj *jsonutils.JSONDict) error {
- dbSku, err := e.newServerSku(obj)
- if err != nil {
- return errors.Wrap(err, "new server sku")
- }
- if err := isValidServerSku(dbSku); err != nil {
- return errors.Wrap(err, "invalid server sku")
- }
- sku := newServerSku(dbSku)
- log.Infof("add server sku %s", jsonutils.Marshal(sku).String())
- e.dataMan.skuMap.Add(sku.Name, sku)
- return nil
- }
- func (e eventHandler) OnAdd(obj *jsonutils.JSONDict) {
- log.Infof("%s [CREATED]: \n%s", e.keyword(), obj.String())
- if err := e.addServerSku(obj); err != nil {
- log.Errorf("add server sku error: %v", err)
- return
- }
- }
- func (e eventHandler) deleteServerSku(sku *ServerSku) error {
- e.dataMan.skuMap.Delete(sku.Name, sku)
- return nil
- }
- func (e eventHandler) updateServerSku(newObj *jsonutils.JSONDict) error {
- newSku, err := e.newServerSku(newObj)
- if err != nil {
- return errors.Wrap(err, "new old server sku")
- }
- shouldDelete := false
- err = isValidServerSku(newSku)
- if err != nil {
- shouldDelete = true
- }
- sku := newServerSku(newSku)
- if shouldDelete {
- log.Infof("delete server sku %s when updating, cause of %v", sku.Name, err)
- if err := e.deleteServerSku(sku); err != nil {
- return errors.Wrap(err, "delete server sku")
- }
- } else {
- if err := e.addServerSku(newObj); err != nil {
- return errors.Wrap(err, "add server sku")
- }
- }
- return nil
- }
- func (e eventHandler) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
- log.Infof("%s [UPDATED]: \n[NEW]: %s\n[OLD]: %s", e.keyword(), newObj.String(), oldObj.String())
- if err := e.updateServerSku(newObj); err != nil {
- log.Errorf("update server sku error: %v", err)
- }
- }
- func (e eventHandler) OnDelete(obj *jsonutils.JSONDict) {
- log.Infof("%s [DELETED]: \n%s", e.keyword(), obj.String())
- sku, err := e.newServerSkuFromJson(obj)
- if err != nil {
- log.Errorf("new server sku error: %v", err)
- return
- }
- if err := e.deleteServerSku(newServerSku(sku)); err != nil {
- log.Errorf("delete server sku error: %v", err)
- }
- }
- func SyncOnce(wait bool) error {
- if skuManager == nil {
- return fmt.Errorf("sku manager not init")
- }
- if wait {
- skuManager.syncOnce()
- } else {
- go skuManager.syncOnce()
- }
- return nil
- }
- func GetByZone(instanceType, regionId, zoneId string) *ServerSku {
- return skuManager.GetByZone(instanceType, regionId, zoneId)
- }
- func GetByRegion(instanceType, regionId string) []*ServerSku {
- return skuManager.GetByRegion(instanceType, regionId)
- }
- type skuMap struct {
- *sync.Map
- }
- type ServerSku struct {
- Id string `json:"id"`
- Name string `json:"name"`
- RegionId string `json:"cloudregion_id"`
- ZoneId string `json:"zone_id"`
- Provider string `json:"provider"`
- }
- type skuList []*ServerSku
- func (l skuList) Has(newSku *ServerSku) (int, bool) {
- for i, oldSku := range l {
- if oldSku.Id == newSku.Id {
- return i, true
- }
- }
- return -1, false
- }
- func (l skuList) DebugString() string {
- return jsonutils.Marshal(l).String()
- }
- func (l skuList) GetByRegion(regionId string) []*ServerSku {
- ret := make([]*ServerSku, 0)
- for idx := range l {
- sku := l[idx]
- if sku.RegionId == regionId {
- ret = append(ret, sku)
- }
- }
- return ret
- }
- func (l skuList) GetByZone(regionId, zoneId string) *ServerSku {
- for _, s := range l {
- if s.ZoneId == zoneId || (len(s.ZoneId) == 0 && s.RegionId == regionId) {
- return s
- }
- }
- return nil
- }
- func newSkuMap() *skuMap {
- return &skuMap{
- Map: new(sync.Map),
- }
- }
- func (cache *skuMap) Get(instanceType string) skuList {
- value, ok := cache.Load(instanceType)
- if ok {
- return value.(skuList)
- }
- return nil
- }
- func (cache *skuMap) Add(instanceType string, sku *ServerSku) {
- skus := cache.Get(instanceType)
- if skus == nil {
- skus = make([]*ServerSku, 0)
- }
- skus = append(skus, sku)
- cache.Store(instanceType, skuList(skus))
- }
- func (cache *skuMap) Delete(instanceType string, sku *ServerSku) {
- skus := cache.Get(instanceType)
- if len(skus) == 0 {
- return
- }
- newSkus := make([]*ServerSku, 0)
- for i := range skus {
- curSku := skus[i]
- if curSku.Id == sku.Id {
- log.Infof("delete sku from cache: %s", jsonutils.Marshal(sku))
- continue
- }
- newSkus = append(newSkus, curSku)
- }
- cache.Store(instanceType, skuList(newSkus))
- }
- type SSkuManager struct {
- // skus cache all server skus in database, key is InstanceType, value is []models.SServerSku
- skuMap *skuMap
- refreshInterval time.Duration
- }
- func (m *SSkuManager) syncOnce() {
- log.Infof("SkuManager start sync")
- startTime := time.Now()
- skus := make([]ServerSku, 0)
- q := models.ServerSkuManager.Query("id", "name", "cloudregion_id", "zone_id", "provider").IsTrue("enabled")
- q = q.Filter(
- sqlchemy.OR(
- sqlchemy.Equals(q.Field("prepaid_status"), computeapi.SkuStatusAvailable),
- sqlchemy.Equals(q.Field("postpaid_status"), computeapi.SkuStatusAvailable)))
- if err := q.All(&skus); err != nil {
- log.Errorf("SkuManager query all available skus error: %v", err)
- return
- }
- m.skuMap = newSkuMap()
- for _, sku := range skus {
- tmp := sku
- m.skuMap.Add(sku.Name, &tmp)
- }
- log.Infof("SkuManager end sync, consume %s", time.Since(startTime))
- }
- func (m *SSkuManager) sync() {
- wait.Forever(m.syncOnce, m.refreshInterval)
- }
- func (m *SSkuManager) GetByZone(instanceType, regionId, zoneId string) *ServerSku {
- l := m.skuMap.Get(instanceType)
- if l == nil {
- return nil
- }
- return l.GetByZone(regionId, zoneId)
- }
- func (m *SSkuManager) GetByRegion(instanceType, regionId string) []*ServerSku {
- l := m.skuMap.Get(instanceType)
- if l == nil {
- return nil
- }
- return l.GetByRegion(regionId)
- }
|