policy.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package ristretto
  17. import (
  18. "math"
  19. "sync"
  20. "sync/atomic"
  21. "github.com/dgraph-io/ristretto/z"
  22. )
  23. const (
  24. // lfuSample is the number of items to sample when looking at eviction
  25. // candidates. 5 seems to be the most optimal number [citation needed].
  26. lfuSample = 5
  27. )
  28. // policy is the interface encapsulating eviction/admission behavior.
  29. //
  30. // TODO: remove this interface and just rename defaultPolicy to policy, as we
  31. // are probably only going to use/implement/maintain one policy.
  32. type policy interface {
  33. ringConsumer
  34. // Add attempts to Add the key-cost pair to the Policy. It returns a slice
  35. // of evicted keys and a bool denoting whether or not the key-cost pair
  36. // was added. If it returns true, the key should be stored in cache.
  37. Add(uint64, int64) ([]*Item, bool)
  38. // Has returns true if the key exists in the Policy.
  39. Has(uint64) bool
  40. // Del deletes the key from the Policy.
  41. Del(uint64)
  42. // Cap returns the available capacity.
  43. Cap() int64
  44. // Close stops all goroutines and closes all channels.
  45. Close()
  46. // Update updates the cost value for the key.
  47. Update(uint64, int64)
  48. // Cost returns the cost value of a key or -1 if missing.
  49. Cost(uint64) int64
  50. // Optionally, set stats object to track how policy is performing.
  51. CollectMetrics(*Metrics)
  52. // Clear zeroes out all counters and clears hashmaps.
  53. Clear()
  54. // MaxCost returns the current max cost of the cache policy.
  55. MaxCost() int64
  56. // UpdateMaxCost updates the max cost of the cache policy.
  57. UpdateMaxCost(int64)
  58. }
  59. func newPolicy(numCounters, maxCost int64) policy {
  60. return newDefaultPolicy(numCounters, maxCost)
  61. }
  62. type defaultPolicy struct {
  63. sync.Mutex
  64. admit *tinyLFU
  65. evict *sampledLFU
  66. itemsCh chan []uint64
  67. stop chan struct{}
  68. isClosed bool
  69. metrics *Metrics
  70. }
  71. func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy {
  72. p := &defaultPolicy{
  73. admit: newTinyLFU(numCounters),
  74. evict: newSampledLFU(maxCost),
  75. itemsCh: make(chan []uint64, 3),
  76. stop: make(chan struct{}),
  77. }
  78. go p.processItems()
  79. return p
  80. }
  81. func (p *defaultPolicy) CollectMetrics(metrics *Metrics) {
  82. p.metrics = metrics
  83. p.evict.metrics = metrics
  84. }
  85. type policyPair struct {
  86. key uint64
  87. cost int64
  88. }
  89. func (p *defaultPolicy) processItems() {
  90. for {
  91. select {
  92. case items := <-p.itemsCh:
  93. p.Lock()
  94. p.admit.Push(items)
  95. p.Unlock()
  96. case <-p.stop:
  97. return
  98. }
  99. }
  100. }
  101. func (p *defaultPolicy) Push(keys []uint64) bool {
  102. if p.isClosed {
  103. return false
  104. }
  105. if len(keys) == 0 {
  106. return true
  107. }
  108. select {
  109. case p.itemsCh <- keys:
  110. p.metrics.add(keepGets, keys[0], uint64(len(keys)))
  111. return true
  112. default:
  113. p.metrics.add(dropGets, keys[0], uint64(len(keys)))
  114. return false
  115. }
  116. }
  117. // Add decides whether the item with the given key and cost should be accepted by
  118. // the policy. It returns the list of victims that have been evicted and a boolean
  119. // indicating whether the incoming item should be accepted.
  120. func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) {
  121. p.Lock()
  122. defer p.Unlock()
  123. // Cannot add an item bigger than entire cache.
  124. if cost > p.evict.getMaxCost() {
  125. return nil, false
  126. }
  127. // No need to go any further if the item is already in the cache.
  128. if has := p.evict.updateIfHas(key, cost); has {
  129. // An update does not count as an addition, so return false.
  130. return nil, false
  131. }
  132. // If the execution reaches this point, the key doesn't exist in the cache.
  133. // Calculate the remaining room in the cache (usually bytes).
  134. room := p.evict.roomLeft(cost)
  135. if room >= 0 {
  136. // There's enough room in the cache to store the new item without
  137. // overflowing. Do that now and stop here.
  138. p.evict.add(key, cost)
  139. p.metrics.add(costAdd, key, uint64(cost))
  140. return nil, true
  141. }
  142. // incHits is the hit count for the incoming item.
  143. incHits := p.admit.Estimate(key)
  144. // sample is the eviction candidate pool to be filled via random sampling.
  145. // TODO: perhaps we should use a min heap here. Right now our time
  146. // complexity is N for finding the min. Min heap should bring it down to
  147. // O(lg N).
  148. sample := make([]*policyPair, 0, lfuSample)
  149. // As items are evicted they will be appended to victims.
  150. victims := make([]*Item, 0)
  151. // Delete victims until there's enough space or a minKey is found that has
  152. // more hits than incoming item.
  153. for ; room < 0; room = p.evict.roomLeft(cost) {
  154. // Fill up empty slots in sample.
  155. sample = p.evict.fillSample(sample)
  156. // Find minimally used item in sample.
  157. minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
  158. for i, pair := range sample {
  159. // Look up hit count for sample key.
  160. if hits := p.admit.Estimate(pair.key); hits < minHits {
  161. minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
  162. }
  163. }
  164. // If the incoming item isn't worth keeping in the policy, reject.
  165. if incHits < minHits {
  166. p.metrics.add(rejectSets, key, 1)
  167. return victims, false
  168. }
  169. // Delete the victim from metadata.
  170. p.evict.del(minKey)
  171. // Delete the victim from sample.
  172. sample[minId] = sample[len(sample)-1]
  173. sample = sample[:len(sample)-1]
  174. // Store victim in evicted victims slice.
  175. victims = append(victims, &Item{
  176. Key: minKey,
  177. Conflict: 0,
  178. Cost: minCost,
  179. })
  180. }
  181. p.evict.add(key, cost)
  182. p.metrics.add(costAdd, key, uint64(cost))
  183. return victims, true
  184. }
  185. func (p *defaultPolicy) Has(key uint64) bool {
  186. p.Lock()
  187. _, exists := p.evict.keyCosts[key]
  188. p.Unlock()
  189. return exists
  190. }
  191. func (p *defaultPolicy) Del(key uint64) {
  192. p.Lock()
  193. p.evict.del(key)
  194. p.Unlock()
  195. }
  196. func (p *defaultPolicy) Cap() int64 {
  197. p.Lock()
  198. capacity := int64(p.evict.getMaxCost() - p.evict.used)
  199. p.Unlock()
  200. return capacity
  201. }
  202. func (p *defaultPolicy) Update(key uint64, cost int64) {
  203. p.Lock()
  204. p.evict.updateIfHas(key, cost)
  205. p.Unlock()
  206. }
  207. func (p *defaultPolicy) Cost(key uint64) int64 {
  208. p.Lock()
  209. if cost, found := p.evict.keyCosts[key]; found {
  210. p.Unlock()
  211. return cost
  212. }
  213. p.Unlock()
  214. return -1
  215. }
  216. func (p *defaultPolicy) Clear() {
  217. p.Lock()
  218. p.admit.clear()
  219. p.evict.clear()
  220. p.Unlock()
  221. }
  222. func (p *defaultPolicy) Close() {
  223. if p.isClosed {
  224. return
  225. }
  226. // Block until the p.processItems goroutine returns.
  227. p.stop <- struct{}{}
  228. close(p.stop)
  229. close(p.itemsCh)
  230. p.isClosed = true
  231. }
  232. func (p *defaultPolicy) MaxCost() int64 {
  233. if p == nil || p.evict == nil {
  234. return 0
  235. }
  236. return p.evict.getMaxCost()
  237. }
  238. func (p *defaultPolicy) UpdateMaxCost(maxCost int64) {
  239. if p == nil || p.evict == nil {
  240. return
  241. }
  242. p.evict.updateMaxCost(maxCost)
  243. }
  244. // sampledLFU is an eviction helper storing key-cost pairs.
  245. type sampledLFU struct {
  246. // NOTE: align maxCost to 64-bit boundary for use with atomic.
  247. // As per https://golang.org/pkg/sync/atomic/: "On ARM, x86-32,
  248. // and 32-bit MIPS, it is the caller’s responsibility to arrange
  249. // for 64-bit alignment of 64-bit words accessed atomically.
  250. // The first word in a variable or in an allocated struct, array,
  251. // or slice can be relied upon to be 64-bit aligned."
  252. maxCost int64
  253. used int64
  254. metrics *Metrics
  255. keyCosts map[uint64]int64
  256. }
  257. func newSampledLFU(maxCost int64) *sampledLFU {
  258. return &sampledLFU{
  259. keyCosts: make(map[uint64]int64),
  260. maxCost: maxCost,
  261. }
  262. }
  263. func (p *sampledLFU) getMaxCost() int64 {
  264. return atomic.LoadInt64(&p.maxCost)
  265. }
  266. func (p *sampledLFU) updateMaxCost(maxCost int64) {
  267. atomic.StoreInt64(&p.maxCost, maxCost)
  268. }
  269. func (p *sampledLFU) roomLeft(cost int64) int64 {
  270. return p.getMaxCost() - (p.used + cost)
  271. }
  272. func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
  273. if len(in) >= lfuSample {
  274. return in
  275. }
  276. for key, cost := range p.keyCosts {
  277. in = append(in, &policyPair{key, cost})
  278. if len(in) >= lfuSample {
  279. return in
  280. }
  281. }
  282. return in
  283. }
  284. func (p *sampledLFU) del(key uint64) {
  285. cost, ok := p.keyCosts[key]
  286. if !ok {
  287. return
  288. }
  289. p.used -= cost
  290. delete(p.keyCosts, key)
  291. p.metrics.add(costEvict, key, uint64(cost))
  292. p.metrics.add(keyEvict, key, 1)
  293. }
  294. func (p *sampledLFU) add(key uint64, cost int64) {
  295. p.keyCosts[key] = cost
  296. p.used += cost
  297. }
  298. func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
  299. if prev, found := p.keyCosts[key]; found {
  300. // Update the cost of an existing key, but don't worry about evicting.
  301. // Evictions will be handled the next time a new item is added.
  302. p.metrics.add(keyUpdate, key, 1)
  303. if prev > cost {
  304. diff := prev - cost
  305. p.metrics.add(costAdd, key, ^uint64(uint64(diff)-1))
  306. } else if cost > prev {
  307. diff := cost - prev
  308. p.metrics.add(costAdd, key, uint64(diff))
  309. }
  310. p.used += cost - prev
  311. p.keyCosts[key] = cost
  312. return true
  313. }
  314. return false
  315. }
  316. func (p *sampledLFU) clear() {
  317. p.used = 0
  318. p.keyCosts = make(map[uint64]int64)
  319. }
  320. // tinyLFU is an admission helper that keeps track of access frequency using
  321. // tiny (4-bit) counters in the form of a count-min sketch.
  322. // tinyLFU is NOT thread safe.
  323. type tinyLFU struct {
  324. freq *cmSketch
  325. door *z.Bloom
  326. incrs int64
  327. resetAt int64
  328. }
  329. func newTinyLFU(numCounters int64) *tinyLFU {
  330. return &tinyLFU{
  331. freq: newCmSketch(numCounters),
  332. door: z.NewBloomFilter(float64(numCounters), 0.01),
  333. resetAt: numCounters,
  334. }
  335. }
  336. func (p *tinyLFU) Push(keys []uint64) {
  337. for _, key := range keys {
  338. p.Increment(key)
  339. }
  340. }
  341. func (p *tinyLFU) Estimate(key uint64) int64 {
  342. hits := p.freq.Estimate(key)
  343. if p.door.Has(key) {
  344. hits++
  345. }
  346. return hits
  347. }
  348. func (p *tinyLFU) Increment(key uint64) {
  349. // Flip doorkeeper bit if not already done.
  350. if added := p.door.AddIfNotHas(key); !added {
  351. // Increment count-min counter if doorkeeper bit is already set.
  352. p.freq.Increment(key)
  353. }
  354. p.incrs++
  355. if p.incrs >= p.resetAt {
  356. p.reset()
  357. }
  358. }
  359. func (p *tinyLFU) reset() {
  360. // Zero out incrs.
  361. p.incrs = 0
  362. // clears doorkeeper bits
  363. p.door.Clear()
  364. // halves count-min counters
  365. p.freq.Reset()
  366. }
  367. func (p *tinyLFU) clear() {
  368. p.incrs = 0
  369. p.door.Clear()
  370. p.freq.Clear()
  371. }