sku.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sku
  15. import (
  16. "context"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/wait"
  24. "yunion.io/x/sqlchemy"
  25. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  27. "yunion.io/x/onecloud/pkg/compute/models"
  28. "yunion.io/x/onecloud/pkg/mcclient/auth"
  29. "yunion.io/x/onecloud/pkg/mcclient/informer"
  30. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  31. )
  32. var (
  33. skuManager *SSkuManager
  34. )
  35. func Start(ctx context.Context, refreshInterval time.Duration) {
  36. skuManager = &SSkuManager{
  37. skuMap: newSkuMap(),
  38. refreshInterval: refreshInterval,
  39. }
  40. skuManager.startWatch(ctx)
  41. skuManager.sync()
  42. }
  43. func (m *SSkuManager) startWatch(ctx context.Context) {
  44. s := auth.GetAdminSession(ctx, consts.GetRegion())
  45. informer.NewWatchManagerBySessionBg(s, func(man *informer.SWatchManager) error {
  46. if err := man.For(compute.ServerSkus).AddEventHandler(ctx, newEventHandler(compute.ServerSkus, m)); err != nil {
  47. return errors.Wrapf(err, "watch resource %s", compute.ServerSkus.KeyString())
  48. }
  49. return nil
  50. })
  51. }
  52. func newEventHandler(res informer.IResourceManager, dataMan *SSkuManager) informer.EventHandler {
  53. return &eventHandler{
  54. res: res,
  55. dataMan: dataMan,
  56. }
  57. }
  58. type eventHandler struct {
  59. res informer.IResourceManager
  60. dataMan *SSkuManager
  61. }
  62. func (e eventHandler) keyword() string {
  63. return e.res.GetKeyword()
  64. }
  65. func (e eventHandler) newServerSkuFromJson(obj *jsonutils.JSONDict) (*models.SServerSku, error) {
  66. sku := &models.SServerSku{}
  67. if err := obj.Unmarshal(sku); err != nil {
  68. return nil, errors.Wrapf(err, "unmarshal server sku by: %s", obj.String())
  69. }
  70. return sku, nil
  71. }
  72. func (e eventHandler) newServerSku(obj *jsonutils.JSONDict) (*models.SServerSku, error) {
  73. sku, err := e.newServerSkuFromJson(obj)
  74. if err != nil {
  75. return nil, errors.Wrap(err, "newServerSkuFromJson")
  76. }
  77. if sku.PostpaidStatus == "" {
  78. obj, err := models.ServerSkuManager.FetchById(sku.Id)
  79. if err != nil {
  80. return nil, errors.Wrapf(err, "fetch serversku by id %q", sku.Id)
  81. }
  82. sku = obj.(*models.SServerSku)
  83. }
  84. return sku, nil
  85. }
  86. func isValidServerSku(sku *models.SServerSku) error {
  87. if sku.PrepaidStatus != computeapi.SkuStatusAvailable && sku.PostpaidStatus != computeapi.SkuStatusAvailable {
  88. return errors.Wrapf(errors.ErrInvalidStatus, "sku: %s, prepaid_status: %q, postpaid_status: %q", sku.Name, sku.PrepaidStatus, sku.PostpaidStatus)
  89. }
  90. if !sku.Enabled.IsTrue() {
  91. return errors.Wrapf(errors.ErrInvalidStatus, "sku: %s, enabled: %q", sku.Name, sku.Enabled)
  92. }
  93. return nil
  94. }
  95. func newServerSku(sku *models.SServerSku) *ServerSku {
  96. return &ServerSku{
  97. Id: sku.Id,
  98. Name: sku.Name,
  99. RegionId: sku.CloudregionId,
  100. ZoneId: sku.ZoneId,
  101. Provider: sku.Provider,
  102. }
  103. }
  104. func (e eventHandler) addServerSku(obj *jsonutils.JSONDict) error {
  105. dbSku, err := e.newServerSku(obj)
  106. if err != nil {
  107. return errors.Wrap(err, "new server sku")
  108. }
  109. if err := isValidServerSku(dbSku); err != nil {
  110. return errors.Wrap(err, "invalid server sku")
  111. }
  112. sku := newServerSku(dbSku)
  113. log.Infof("add server sku %s", jsonutils.Marshal(sku).String())
  114. e.dataMan.skuMap.Add(sku.Name, sku)
  115. return nil
  116. }
  117. func (e eventHandler) OnAdd(obj *jsonutils.JSONDict) {
  118. log.Infof("%s [CREATED]: \n%s", e.keyword(), obj.String())
  119. if err := e.addServerSku(obj); err != nil {
  120. log.Errorf("add server sku error: %v", err)
  121. return
  122. }
  123. }
  124. func (e eventHandler) deleteServerSku(sku *ServerSku) error {
  125. e.dataMan.skuMap.Delete(sku.Name, sku)
  126. return nil
  127. }
  128. func (e eventHandler) updateServerSku(newObj *jsonutils.JSONDict) error {
  129. newSku, err := e.newServerSku(newObj)
  130. if err != nil {
  131. return errors.Wrap(err, "new old server sku")
  132. }
  133. shouldDelete := false
  134. err = isValidServerSku(newSku)
  135. if err != nil {
  136. shouldDelete = true
  137. }
  138. sku := newServerSku(newSku)
  139. if shouldDelete {
  140. log.Infof("delete server sku %s when updating, cause of %v", sku.Name, err)
  141. if err := e.deleteServerSku(sku); err != nil {
  142. return errors.Wrap(err, "delete server sku")
  143. }
  144. } else {
  145. if err := e.addServerSku(newObj); err != nil {
  146. return errors.Wrap(err, "add server sku")
  147. }
  148. }
  149. return nil
  150. }
  151. func (e eventHandler) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
  152. log.Infof("%s [UPDATED]: \n[NEW]: %s\n[OLD]: %s", e.keyword(), newObj.String(), oldObj.String())
  153. if err := e.updateServerSku(newObj); err != nil {
  154. log.Errorf("update server sku error: %v", err)
  155. }
  156. }
  157. func (e eventHandler) OnDelete(obj *jsonutils.JSONDict) {
  158. log.Infof("%s [DELETED]: \n%s", e.keyword(), obj.String())
  159. sku, err := e.newServerSkuFromJson(obj)
  160. if err != nil {
  161. log.Errorf("new server sku error: %v", err)
  162. return
  163. }
  164. if err := e.deleteServerSku(newServerSku(sku)); err != nil {
  165. log.Errorf("delete server sku error: %v", err)
  166. }
  167. }
  168. func SyncOnce(wait bool) error {
  169. if skuManager == nil {
  170. return fmt.Errorf("sku manager not init")
  171. }
  172. if wait {
  173. skuManager.syncOnce()
  174. } else {
  175. go skuManager.syncOnce()
  176. }
  177. return nil
  178. }
  179. func GetByZone(instanceType, regionId, zoneId string) *ServerSku {
  180. return skuManager.GetByZone(instanceType, regionId, zoneId)
  181. }
  182. func GetByRegion(instanceType, regionId string) []*ServerSku {
  183. return skuManager.GetByRegion(instanceType, regionId)
  184. }
  185. type skuMap struct {
  186. *sync.Map
  187. }
  188. type ServerSku struct {
  189. Id string `json:"id"`
  190. Name string `json:"name"`
  191. RegionId string `json:"cloudregion_id"`
  192. ZoneId string `json:"zone_id"`
  193. Provider string `json:"provider"`
  194. }
  195. type skuList []*ServerSku
  196. func (l skuList) Has(newSku *ServerSku) (int, bool) {
  197. for i, oldSku := range l {
  198. if oldSku.Id == newSku.Id {
  199. return i, true
  200. }
  201. }
  202. return -1, false
  203. }
  204. func (l skuList) DebugString() string {
  205. return jsonutils.Marshal(l).String()
  206. }
  207. func (l skuList) GetByRegion(regionId string) []*ServerSku {
  208. ret := make([]*ServerSku, 0)
  209. for idx := range l {
  210. sku := l[idx]
  211. if sku.RegionId == regionId {
  212. ret = append(ret, sku)
  213. }
  214. }
  215. return ret
  216. }
  217. func (l skuList) GetByZone(regionId, zoneId string) *ServerSku {
  218. for _, s := range l {
  219. if s.ZoneId == zoneId || (len(s.ZoneId) == 0 && s.RegionId == regionId) {
  220. return s
  221. }
  222. }
  223. return nil
  224. }
  225. func newSkuMap() *skuMap {
  226. return &skuMap{
  227. Map: new(sync.Map),
  228. }
  229. }
  230. func (cache *skuMap) Get(instanceType string) skuList {
  231. value, ok := cache.Load(instanceType)
  232. if ok {
  233. return value.(skuList)
  234. }
  235. return nil
  236. }
  237. func (cache *skuMap) Add(instanceType string, sku *ServerSku) {
  238. skus := cache.Get(instanceType)
  239. if skus == nil {
  240. skus = make([]*ServerSku, 0)
  241. }
  242. skus = append(skus, sku)
  243. cache.Store(instanceType, skuList(skus))
  244. }
  245. func (cache *skuMap) Delete(instanceType string, sku *ServerSku) {
  246. skus := cache.Get(instanceType)
  247. if len(skus) == 0 {
  248. return
  249. }
  250. newSkus := make([]*ServerSku, 0)
  251. for i := range skus {
  252. curSku := skus[i]
  253. if curSku.Id == sku.Id {
  254. log.Infof("delete sku from cache: %s", jsonutils.Marshal(sku))
  255. continue
  256. }
  257. newSkus = append(newSkus, curSku)
  258. }
  259. cache.Store(instanceType, skuList(newSkus))
  260. }
  261. type SSkuManager struct {
  262. // skus cache all server skus in database, key is InstanceType, value is []models.SServerSku
  263. skuMap *skuMap
  264. refreshInterval time.Duration
  265. }
  266. func (m *SSkuManager) syncOnce() {
  267. log.Infof("SkuManager start sync")
  268. startTime := time.Now()
  269. skus := make([]ServerSku, 0)
  270. q := models.ServerSkuManager.Query("id", "name", "cloudregion_id", "zone_id", "provider").IsTrue("enabled")
  271. q = q.Filter(
  272. sqlchemy.OR(
  273. sqlchemy.Equals(q.Field("prepaid_status"), computeapi.SkuStatusAvailable),
  274. sqlchemy.Equals(q.Field("postpaid_status"), computeapi.SkuStatusAvailable)))
  275. if err := q.All(&skus); err != nil {
  276. log.Errorf("SkuManager query all available skus error: %v", err)
  277. return
  278. }
  279. m.skuMap = newSkuMap()
  280. for _, sku := range skus {
  281. tmp := sku
  282. m.skuMap.Add(sku.Name, &tmp)
  283. }
  284. log.Infof("SkuManager end sync, consume %s", time.Since(startTime))
  285. }
  286. func (m *SSkuManager) sync() {
  287. wait.Forever(m.syncOnce, m.refreshInterval)
  288. }
  289. func (m *SSkuManager) GetByZone(instanceType, regionId, zoneId string) *ServerSku {
  290. l := m.skuMap.Get(instanceType)
  291. if l == nil {
  292. return nil
  293. }
  294. return l.GetByZone(regionId, zoneId)
  295. }
  296. func (m *SSkuManager) GetByRegion(instanceType, regionId string) []*ServerSku {
  297. l := m.skuMap.Get(instanceType)
  298. if l == nil {
  299. return nil
  300. }
  301. return l.GetByRegion(regionId)
  302. }