cache.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "fmt"
  17. "reflect"
  18. "sync"
  19. "time"
  20. "yunion.io/x/log"
  21. expirationcache "yunion.io/x/pkg/util/cache"
  22. "yunion.io/x/pkg/util/wait"
  23. )
  24. var (
  25. normalError = fmt.Errorf("%s", "no need update all")
  26. )
  27. var (
  28. // Full update every 10 minutes(30s * 20), but The first implementation subtracts initialization
  29. fullUpdateHostsCounter = 0
  30. fullUpdateBaremetalsCounter = 0
  31. )
  32. func NewCache(kind string, item CachedItem) Cache {
  33. cache := newSchedulerCache(kind, item)
  34. return cache
  35. }
  36. type schedulerCache struct {
  37. kind string
  38. item CachedItem
  39. cache expirationcache.Store
  40. readyCh chan struct{}
  41. cacheCandidate sync.Map
  42. }
  43. func newSchedulerCache(
  44. kind string,
  45. item CachedItem,
  46. ) *schedulerCache {
  47. return &schedulerCache{
  48. kind: kind,
  49. item: item,
  50. cache: expirationcache.NewTTLStore(item.Key, item.TTL()),
  51. readyCh: make(chan struct{}),
  52. }
  53. }
  54. func (c *schedulerCache) Name() string {
  55. return fmt.Sprintf("%s - %s", c.kind, c.item.Name())
  56. }
  57. func (c *schedulerCache) Get(key string) (interface{}, error) {
  58. value, ok, err := c.cache.GetByKey(key)
  59. if err != nil {
  60. return nil, err
  61. }
  62. if !ok {
  63. log.Infof("Update %s, id: %s", c.Name(), key)
  64. objs, err := c.item.Update([]string{key})
  65. if err != nil {
  66. return nil, err
  67. }
  68. if len(objs) < 1 {
  69. return nil, fmt.Errorf("object %v not found", key)
  70. }
  71. obj := objs[0]
  72. err = c.cache.Add(obj)
  73. if err != nil {
  74. return nil, err
  75. }
  76. return obj, nil
  77. }
  78. return value, nil
  79. }
  80. func (c *schedulerCache) Add(obj interface{}) error {
  81. return c.cache.Add(obj)
  82. }
  83. func (c *schedulerCache) Update(obj interface{}) error {
  84. return c.Add(obj)
  85. }
  86. func (c *schedulerCache) Delete(obj interface{}) error {
  87. return c.cache.Delete(obj)
  88. }
  89. func (c *schedulerCache) List() []interface{} {
  90. return c.cache.List()
  91. }
  92. func (c *schedulerCache) Start(stop <-chan struct{}) {
  93. f := c.updateAllObjects
  94. p := c.item.Period()
  95. go wait.Until(f, p, stop)
  96. }
  97. func (c *schedulerCache) Reload(keys []string) ([]interface{}, error) {
  98. return c.loadObjects(keys)
  99. }
  100. func (c *schedulerCache) ReloadAll() ([]interface{}, error) {
  101. return c.loadObjects(nil)
  102. }
  103. func (c *schedulerCache) WaitForReady() {
  104. readyCh := c.readyCh
  105. if readyCh != nil {
  106. <-c.readyCh
  107. }
  108. }
  109. func (c *schedulerCache) updateAllObjects() {
  110. defer func() {
  111. if c.readyCh != nil {
  112. close(c.readyCh)
  113. c.readyCh = nil
  114. }
  115. }()
  116. // Get the data you need to update.
  117. ids, err := c.item.GetUpdate(c.List())
  118. // if ids is nil and err is nil,than update all.
  119. if len(ids) == 0 && err == nil {
  120. c.loadObjects(nil)
  121. } else if len(ids) == 0 && reflect.DeepEqual(err, normalError) {
  122. // if ids is nil and err is normalError then return.
  123. return
  124. } else if len(ids) > 0 {
  125. log.V(10).Debugf("Update host/baremetal status list: %v", ids)
  126. c.loadObjects(ids)
  127. }
  128. }
  129. func (c *schedulerCache) loadObjects(ids []string) ([]interface{}, error) {
  130. log.Infof("Start load %s, period: %v, ttl: %v", c.Name(), c.item.Period(), c.item.TTL())
  131. startTime := time.Now()
  132. defer func() {
  133. log.Infof("End load %s, elapsed %s", c.Name(), time.Since(startTime))
  134. }()
  135. var (
  136. objects []interface{}
  137. needUpdate map[string]bool
  138. err error
  139. )
  140. if ids == nil {
  141. needUpdate = make(map[string]bool, 0)
  142. c.cacheCandidate.Range(func(key, _ interface{}) bool {
  143. if key != nil && key.(string) != "" {
  144. needUpdate[key.(string)] = true
  145. }
  146. return true
  147. })
  148. objects, err = c.item.Load()
  149. } else {
  150. needUpdate = make(map[string]bool, len(ids))
  151. for _, id := range ids {
  152. if id != "" {
  153. needUpdate[id] = true
  154. }
  155. }
  156. objects, err = c.item.Update(ids)
  157. }
  158. if err != nil {
  159. log.Errorf("Load %s: %v", c.Name(), err)
  160. return nil, err
  161. }
  162. log.V(4).Infof("%v objects loaded", len(objects))
  163. for _, obj := range objects {
  164. // Add the load new data into cache.
  165. err := c.Add(obj)
  166. if err != nil {
  167. log.Errorf("Add %v object to %s cache: %v", obj, c.Name(), err)
  168. continue
  169. }
  170. if id, err := c.item.Key(obj); err == nil {
  171. // If exist the id then the id is valid and we set it to false.
  172. if _, ok := needUpdate[id]; ok {
  173. needUpdate[id] = false
  174. }
  175. // Add or update new data into global cache.
  176. c.cacheCandidate.Store(id, obj)
  177. }
  178. }
  179. // If status is true,then the host must have been deleted.
  180. for id, status := range needUpdate {
  181. if status {
  182. // Load the need delete object and will delete it from chache and scheduler'cache.
  183. object, ok := c.cacheCandidate.Load(id)
  184. if ok {
  185. c.cacheCandidate.Delete(id)
  186. c.Delete(object)
  187. }
  188. }
  189. }
  190. return objects, err
  191. }