// Copyright 2019 Yunion // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cache import ( "errors" "sync" "yunion.io/x/pkg/util/sets" ) // PopProcessFunc is passed to Pop() method of Queue interface. // It is supposed to process the element popped from the queue. type PopProcessFunc func(interface{}) error // ErrRequeue may be returned by a PopProcessFunc to safely requeue // the current item. The value of Err will be returned from Pop. type ErrRequeue struct { // Err is returned by the Pop function Err error } var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue") func (e ErrRequeue) Error() string { if e.Err == nil { return "the popped item should be requeued without returning an error" } return e.Err.Error() } // Queue is exactly like a Store, but has a Pop() method too. type Queue interface { Store // Pop blocks until it has something to process. // It returns the object that was process and the result of processing. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item // should be requeued before releasing the lock on the queue. Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent adds a value previously // returned by Pop back into the queue as long // as nothing else (presumably more recent) // has since been added. AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of items has been popped HasSynced() bool // Close queue Close() } // Helper function for popping from Queue. // WARNING: Do NOT use this function in non-test code to avoid races // unless you really really really really know what you are doing. func Pop(queue Queue) interface{} { var result interface{} queue.Pop(func(obj interface{}) error { result = obj return nil }) return result } // FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO order processing. If multiple adds/updates of a single item happen while // an item is in the queue before it has been processed, it will only be // processed once, and when it is processed, the most recent version will be // processed. This can't be done with a channel. // // FIFO solves this use case: // * You want to process every object (exactly) once. // * You want to process the most recent version of the object when you process it. // * You do not want to process deleted objects, they should be removed from the queue. // * You do not want to periodically reprocess objects. // Compare with DeltaFIFO for other use cases. type FIFO struct { lock sync.RWMutex cond sync.Cond // We depend on the property that items in the set are in the queue and vice versa. items map[string]interface{} queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item insertion and retrieval, and // should be deterministic. keyFunc KeyFunc // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex } var ( _ = Queue(&FIFO{}) // FIFO is a Queue ) // Close the queue. func (f *FIFO) Close() { f.closedLock.Lock() defer f.closedLock.Unlock() f.closed = true f.cond.Broadcast() } // Return true if an Add/Update/Delete/AddIfNotPresent are called first, // or an Update called first but the first batch of items inserted by Replace() has been popped func (f *FIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() return f.populated && f.initialPopulationCount == 0 } // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = obj f.cond.Broadcast() return nil } // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already // present in the set, it is neither enqueued nor added to the set. // // This is useful in a single producer/consumer scenario so that the consumer can // safely retry items without contending with the producer and potentially enqueueing // stale items. func (f *FIFO) AddIfNotPresent(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.addIfNotPresent(id, obj) return nil } // addIfNotPresent assumes the fifo lock is already held and adds the provided // item to the queue under id if it does not already exist. func (f *FIFO) addIfNotPresent(id string, obj interface{}) { f.populated = true if _, exists := f.items[id]; exists { return } f.queue = append(f.queue, id) f.items[id] = obj f.cond.Broadcast() } // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) } // Delete removes an item. It doesn't add it to the queue, because // this implementation assumes the consumer only cares about the objects, // not the order in which they were created/added. func (f *FIFO) Delete(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true delete(f.items, id) return err } // List returns a list of all the items. func (f *FIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { list = append(list, item) } return list } // ListKeys returns a list of all the keys of the objects currently // in the FIFO. func (f *FIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list } // Get returns the requested item, or sets exists=false. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.keyFunc(obj) if err != nil { return nil, false, KeyError{obj, err} } return f.GetByKey(key) } // GetByKey returns the requested item, or sets exists=false. func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() item, exists = f.items[key] return item, exists, nil } // Checks if the queue is closed func (f *FIFO) IsClosed() bool { f.closedLock.Lock() defer f.closedLock.Unlock() if f.closed { return true } return false } // Pop waits until an item is ready and processes it. If multiple items are // ready, they are returned in the order in which they were added/updated. // The item is removed from the queue (and the store) before it is processed, // so if you don't successfully process it, it should be added back with // AddIfNotPresent(). process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, FIFOClosedError } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err } } // Replace will delete the contents of 'f', using instead the given map. // 'f' takes ownership of the map, you should not reference the map again // after calling this function. f's queue is reset, too; upon return, it // will contain the items in the map, in no particular order. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := f.keyFunc(item) if err != nil { return KeyError{item, err} } items[key] = item } f.lock.Lock() defer f.lock.Unlock() if !f.populated { f.populated = true f.initialPopulationCount = len(items) } f.items = items f.queue = f.queue[:0] for id := range items { f.queue = append(f.queue, id) } if len(f.queue) > 0 { f.cond.Broadcast() } return nil } // Resync will touch all objects to put them into the processing queue func (f *FIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() inQueue := sets.NewString() for _, id := range f.queue { inQueue.Insert(id) } for id := range f.items { if !inQueue.Has(id) { f.queue = append(f.queue, id) } } if len(f.queue) > 0 { f.cond.Broadcast() } return nil } // NewFIFO returns a Store which can be used to queue up items to // process. func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, } f.cond.L = &f.lock return f }