fifo.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "errors"
  17. "sync"
  18. "yunion.io/x/pkg/util/sets"
  19. )
  20. // PopProcessFunc is passed to Pop() method of Queue interface.
  21. // It is supposed to process the element popped from the queue.
  22. type PopProcessFunc func(interface{}) error
  23. // ErrRequeue may be returned by a PopProcessFunc to safely requeue
  24. // the current item. The value of Err will be returned from Pop.
  25. type ErrRequeue struct {
  26. // Err is returned by the Pop function
  27. Err error
  28. }
  29. var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
  30. func (e ErrRequeue) Error() string {
  31. if e.Err == nil {
  32. return "the popped item should be requeued without returning an error"
  33. }
  34. return e.Err.Error()
  35. }
  36. // Queue is exactly like a Store, but has a Pop() method too.
  37. type Queue interface {
  38. Store
  39. // Pop blocks until it has something to process.
  40. // It returns the object that was process and the result of processing.
  41. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
  42. // should be requeued before releasing the lock on the queue.
  43. Pop(PopProcessFunc) (interface{}, error)
  44. // AddIfNotPresent adds a value previously
  45. // returned by Pop back into the queue as long
  46. // as nothing else (presumably more recent)
  47. // has since been added.
  48. AddIfNotPresent(interface{}) error
  49. // HasSynced returns true if the first batch of items has been popped
  50. HasSynced() bool
  51. // Close queue
  52. Close()
  53. }
  54. // Helper function for popping from Queue.
  55. // WARNING: Do NOT use this function in non-test code to avoid races
  56. // unless you really really really really know what you are doing.
  57. func Pop(queue Queue) interface{} {
  58. var result interface{}
  59. queue.Pop(func(obj interface{}) error {
  60. result = obj
  61. return nil
  62. })
  63. return result
  64. }
  65. // FIFO receives adds and updates from a Reflector, and puts them in a queue for
  66. // FIFO order processing. If multiple adds/updates of a single item happen while
  67. // an item is in the queue before it has been processed, it will only be
  68. // processed once, and when it is processed, the most recent version will be
  69. // processed. This can't be done with a channel.
  70. //
  71. // FIFO solves this use case:
  72. // * You want to process every object (exactly) once.
  73. // * You want to process the most recent version of the object when you process it.
  74. // * You do not want to process deleted objects, they should be removed from the queue.
  75. // * You do not want to periodically reprocess objects.
  76. // Compare with DeltaFIFO for other use cases.
  77. type FIFO struct {
  78. lock sync.RWMutex
  79. cond sync.Cond
  80. // We depend on the property that items in the set are in the queue and vice versa.
  81. items map[string]interface{}
  82. queue []string
  83. // populated is true if the first batch of items inserted by Replace() has been populated
  84. // or Delete/Add/Update was called first.
  85. populated bool
  86. // initialPopulationCount is the number of items inserted by the first call of Replace()
  87. initialPopulationCount int
  88. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  89. // should be deterministic.
  90. keyFunc KeyFunc
  91. // Indication the queue is closed.
  92. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  93. // Currently, not used to gate any of CRED operations.
  94. closed bool
  95. closedLock sync.Mutex
  96. }
  97. var (
  98. _ = Queue(&FIFO{}) // FIFO is a Queue
  99. )
  100. // Close the queue.
  101. func (f *FIFO) Close() {
  102. f.closedLock.Lock()
  103. defer f.closedLock.Unlock()
  104. f.closed = true
  105. f.cond.Broadcast()
  106. }
  107. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  108. // or an Update called first but the first batch of items inserted by Replace() has been popped
  109. func (f *FIFO) HasSynced() bool {
  110. f.lock.Lock()
  111. defer f.lock.Unlock()
  112. return f.populated && f.initialPopulationCount == 0
  113. }
  114. // Add inserts an item, and puts it in the queue. The item is only enqueued
  115. // if it doesn't already exist in the set.
  116. func (f *FIFO) Add(obj interface{}) error {
  117. id, err := f.keyFunc(obj)
  118. if err != nil {
  119. return KeyError{obj, err}
  120. }
  121. f.lock.Lock()
  122. defer f.lock.Unlock()
  123. f.populated = true
  124. if _, exists := f.items[id]; !exists {
  125. f.queue = append(f.queue, id)
  126. }
  127. f.items[id] = obj
  128. f.cond.Broadcast()
  129. return nil
  130. }
  131. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  132. // present in the set, it is neither enqueued nor added to the set.
  133. //
  134. // This is useful in a single producer/consumer scenario so that the consumer can
  135. // safely retry items without contending with the producer and potentially enqueueing
  136. // stale items.
  137. func (f *FIFO) AddIfNotPresent(obj interface{}) error {
  138. id, err := f.keyFunc(obj)
  139. if err != nil {
  140. return KeyError{obj, err}
  141. }
  142. f.lock.Lock()
  143. defer f.lock.Unlock()
  144. f.addIfNotPresent(id, obj)
  145. return nil
  146. }
  147. // addIfNotPresent assumes the fifo lock is already held and adds the provided
  148. // item to the queue under id if it does not already exist.
  149. func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
  150. f.populated = true
  151. if _, exists := f.items[id]; exists {
  152. return
  153. }
  154. f.queue = append(f.queue, id)
  155. f.items[id] = obj
  156. f.cond.Broadcast()
  157. }
  158. // Update is the same as Add in this implementation.
  159. func (f *FIFO) Update(obj interface{}) error {
  160. return f.Add(obj)
  161. }
  162. // Delete removes an item. It doesn't add it to the queue, because
  163. // this implementation assumes the consumer only cares about the objects,
  164. // not the order in which they were created/added.
  165. func (f *FIFO) Delete(obj interface{}) error {
  166. id, err := f.keyFunc(obj)
  167. if err != nil {
  168. return KeyError{obj, err}
  169. }
  170. f.lock.Lock()
  171. defer f.lock.Unlock()
  172. f.populated = true
  173. delete(f.items, id)
  174. return err
  175. }
  176. // List returns a list of all the items.
  177. func (f *FIFO) List() []interface{} {
  178. f.lock.RLock()
  179. defer f.lock.RUnlock()
  180. list := make([]interface{}, 0, len(f.items))
  181. for _, item := range f.items {
  182. list = append(list, item)
  183. }
  184. return list
  185. }
  186. // ListKeys returns a list of all the keys of the objects currently
  187. // in the FIFO.
  188. func (f *FIFO) ListKeys() []string {
  189. f.lock.RLock()
  190. defer f.lock.RUnlock()
  191. list := make([]string, 0, len(f.items))
  192. for key := range f.items {
  193. list = append(list, key)
  194. }
  195. return list
  196. }
  197. // Get returns the requested item, or sets exists=false.
  198. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  199. key, err := f.keyFunc(obj)
  200. if err != nil {
  201. return nil, false, KeyError{obj, err}
  202. }
  203. return f.GetByKey(key)
  204. }
  205. // GetByKey returns the requested item, or sets exists=false.
  206. func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  207. f.lock.RLock()
  208. defer f.lock.RUnlock()
  209. item, exists = f.items[key]
  210. return item, exists, nil
  211. }
  212. // Checks if the queue is closed
  213. func (f *FIFO) IsClosed() bool {
  214. f.closedLock.Lock()
  215. defer f.closedLock.Unlock()
  216. if f.closed {
  217. return true
  218. }
  219. return false
  220. }
  221. // Pop waits until an item is ready and processes it. If multiple items are
  222. // ready, they are returned in the order in which they were added/updated.
  223. // The item is removed from the queue (and the store) before it is processed,
  224. // so if you don't successfully process it, it should be added back with
  225. // AddIfNotPresent(). process function is called under lock, so it is safe
  226. // update data structures in it that need to be in sync with the queue.
  227. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  228. f.lock.Lock()
  229. defer f.lock.Unlock()
  230. for {
  231. for len(f.queue) == 0 {
  232. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  233. // When Close() is called, the f.closed is set and the condition is broadcasted.
  234. // Which causes this loop to continue and return from the Pop().
  235. if f.IsClosed() {
  236. return nil, FIFOClosedError
  237. }
  238. f.cond.Wait()
  239. }
  240. id := f.queue[0]
  241. f.queue = f.queue[1:]
  242. if f.initialPopulationCount > 0 {
  243. f.initialPopulationCount--
  244. }
  245. item, ok := f.items[id]
  246. if !ok {
  247. // Item may have been deleted subsequently.
  248. continue
  249. }
  250. delete(f.items, id)
  251. err := process(item)
  252. if e, ok := err.(ErrRequeue); ok {
  253. f.addIfNotPresent(id, item)
  254. err = e.Err
  255. }
  256. return item, err
  257. }
  258. }
  259. // Replace will delete the contents of 'f', using instead the given map.
  260. // 'f' takes ownership of the map, you should not reference the map again
  261. // after calling this function. f's queue is reset, too; upon return, it
  262. // will contain the items in the map, in no particular order.
  263. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
  264. items := map[string]interface{}{}
  265. for _, item := range list {
  266. key, err := f.keyFunc(item)
  267. if err != nil {
  268. return KeyError{item, err}
  269. }
  270. items[key] = item
  271. }
  272. f.lock.Lock()
  273. defer f.lock.Unlock()
  274. if !f.populated {
  275. f.populated = true
  276. f.initialPopulationCount = len(items)
  277. }
  278. f.items = items
  279. f.queue = f.queue[:0]
  280. for id := range items {
  281. f.queue = append(f.queue, id)
  282. }
  283. if len(f.queue) > 0 {
  284. f.cond.Broadcast()
  285. }
  286. return nil
  287. }
  288. // Resync will touch all objects to put them into the processing queue
  289. func (f *FIFO) Resync() error {
  290. f.lock.Lock()
  291. defer f.lock.Unlock()
  292. inQueue := sets.NewString()
  293. for _, id := range f.queue {
  294. inQueue.Insert(id)
  295. }
  296. for id := range f.items {
  297. if !inQueue.Has(id) {
  298. f.queue = append(f.queue, id)
  299. }
  300. }
  301. if len(f.queue) > 0 {
  302. f.cond.Broadcast()
  303. }
  304. return nil
  305. }
  306. // NewFIFO returns a Store which can be used to queue up items to
  307. // process.
  308. func NewFIFO(keyFunc KeyFunc) *FIFO {
  309. f := &FIFO{
  310. items: map[string]interface{}{},
  311. queue: []string{},
  312. keyFunc: keyFunc,
  313. }
  314. f.cond.L = &f.lock
  315. return f
  316. }