expiration_cache.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "sync"
  17. "time"
  18. "yunion.io/x/log"
  19. "yunion.io/x/pkg/util/clock"
  20. )
  21. // ExpirationCache implements the store interface
  22. // 1. All entries are automatically time stamped on insert
  23. // a. The key is computed based off the original item/keyFunc
  24. // b. The value inserted under that key is the timestamped item
  25. // 2. Expiration happens lazily on read based on the expiration policy
  26. // a. No item can be inserted into the store while we're expiring
  27. // *any* item in the cache.
  28. // 3. Time-stamps are stripped off unexpired entries before return
  29. // Note that the ExpirationCache is inherently slower than a normal
  30. // threadSafeStore because it takes a write lock every time it checks if
  31. // an item has expired.
  32. type ExpirationCache struct {
  33. cacheStorage ThreadSafeStore
  34. keyFunc KeyFunc
  35. clock clock.Clock
  36. expirationPolicy ExpirationPolicy
  37. // expirationLock is a write lock used to guarantee that we don't clobber
  38. // newly inserted objects because of a stale expiration timestamp comparison
  39. expirationLock sync.Mutex
  40. }
  41. // ExpirationPolicy dictates when an object expires. Currently only abstracted out
  42. // so unittests don't rely on the system clock.
  43. type ExpirationPolicy interface {
  44. IsExpired(obj *timestampedEntry) bool
  45. }
  46. // TTLPolicy implements a ttl based ExpirationPolicy.
  47. type TTLPolicy struct {
  48. // >0: Expire entries with an age > ttl
  49. // <=0: Don't expire any entry
  50. Ttl time.Duration
  51. // Clock used to calculate ttl expiration
  52. Clock clock.Clock
  53. }
  54. // IsExpired returns true if the given object is older than the ttl, or it can't
  55. // determine its age.
  56. func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
  57. return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
  58. }
  59. // timestampedEntry is the only type allowed in a ExpirationCache.
  60. type timestampedEntry struct {
  61. obj interface{}
  62. timestamp time.Time
  63. }
  64. // getTimestampedEntry returns the timestampedEntry stored under the given key.
  65. func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
  66. item, _ := c.cacheStorage.Get(key)
  67. if tsEntry, ok := item.(*timestampedEntry); ok {
  68. return tsEntry, true
  69. }
  70. return nil, false
  71. }
  72. // getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
  73. // already expired. It holds a write lock across deletion.
  74. func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
  75. // Prevent all inserts from the time we deem an item as "expired" to when we
  76. // delete it, so an un-expired item doesn't sneak in under the same key, just
  77. // before the Delete.
  78. c.expirationLock.Lock()
  79. defer c.expirationLock.Unlock()
  80. timestampedItem, exists := c.getTimestampedEntry(key)
  81. if !exists {
  82. return nil, false
  83. }
  84. if c.expirationPolicy.IsExpired(timestampedItem) {
  85. log.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
  86. c.cacheStorage.Delete(key)
  87. return nil, false
  88. }
  89. return timestampedItem.obj, true
  90. }
  91. // GetByKey returns the item stored under the key, or sets exists=false.
  92. func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
  93. obj, exists := c.getOrExpire(key)
  94. return obj, exists, nil
  95. }
  96. // Get returns unexpired items. It purges the cache of expired items in the
  97. // process.
  98. func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
  99. key, err := c.keyFunc(obj)
  100. if err != nil {
  101. return nil, false, KeyError{obj, err}
  102. }
  103. obj, exists := c.getOrExpire(key)
  104. return obj, exists, nil
  105. }
  106. // List retrieves a list of unexpired items. It purges the cache of expired
  107. // items in the process.
  108. func (c *ExpirationCache) List() []interface{} {
  109. items := c.cacheStorage.List()
  110. list := make([]interface{}, 0, len(items))
  111. for _, item := range items {
  112. obj := item.(*timestampedEntry).obj
  113. if key, err := c.keyFunc(obj); err != nil {
  114. list = append(list, obj)
  115. } else if obj, exists := c.getOrExpire(key); exists {
  116. list = append(list, obj)
  117. }
  118. }
  119. return list
  120. }
  121. // ListKeys returns a list of all keys in the expiration cache.
  122. func (c *ExpirationCache) ListKeys() []string {
  123. return c.cacheStorage.ListKeys()
  124. }
  125. // Add timestamps an item and inserts it into the cache, overwriting entries
  126. // that might exist under the same key.
  127. func (c *ExpirationCache) Add(obj interface{}) error {
  128. c.expirationLock.Lock()
  129. defer c.expirationLock.Unlock()
  130. key, err := c.keyFunc(obj)
  131. if err != nil {
  132. return KeyError{obj, err}
  133. }
  134. c.cacheStorage.Add(key, &timestampedEntry{obj, c.clock.Now()})
  135. return nil
  136. }
  137. // Update has not been implemented yet for lack of a use case, so this method
  138. // simply calls `Add`. This effectively refreshes the timestamp.
  139. func (c *ExpirationCache) Update(obj interface{}) error {
  140. return c.Add(obj)
  141. }
  142. // Delete removes an item from the cache.
  143. func (c *ExpirationCache) Delete(obj interface{}) error {
  144. c.expirationLock.Lock()
  145. defer c.expirationLock.Unlock()
  146. key, err := c.keyFunc(obj)
  147. if err != nil {
  148. return KeyError{obj, err}
  149. }
  150. c.cacheStorage.Delete(key)
  151. return nil
  152. }
  153. // Replace will convert all items in the given list to TimestampedEntries
  154. // before attempting the replace operation. The replace operation will
  155. // delete the contents of the ExpirationCache `c`.
  156. func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
  157. c.expirationLock.Lock()
  158. defer c.expirationLock.Unlock()
  159. items := map[string]interface{}{}
  160. ts := c.clock.Now()
  161. for _, item := range list {
  162. key, err := c.keyFunc(item)
  163. if err != nil {
  164. return KeyError{item, err}
  165. }
  166. items[key] = &timestampedEntry{item, ts}
  167. }
  168. c.cacheStorage.Replace(items, resourceVersion)
  169. return nil
  170. }
  171. // Resync will touch all objects to put them into the processing queue
  172. func (c *ExpirationCache) Resync() error {
  173. return c.cacheStorage.Resync()
  174. }
  175. // NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
  176. func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
  177. return &ExpirationCache{
  178. cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
  179. keyFunc: keyFunc,
  180. clock: clock.RealClock{},
  181. expirationPolicy: &TTLPolicy{ttl, clock.RealClock{}},
  182. }
  183. }