ttl.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package ristretto
  17. import (
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. // TODO: find the optimal value or make it configurable.
  23. bucketDurationSecs = int64(5)
  24. )
  25. func storageBucket(t time.Time) int64 {
  26. return (t.Unix() / bucketDurationSecs) + 1
  27. }
  28. func cleanupBucket(t time.Time) int64 {
  29. // The bucket to cleanup is always behind the storage bucket by one so that
  30. // no elements in that bucket (which might not have expired yet) are deleted.
  31. return storageBucket(t) - 1
  32. }
  33. // bucket type is a map of key to conflict.
  34. type bucket map[uint64]uint64
  35. // expirationMap is a map of bucket number to the corresponding bucket.
  36. type expirationMap struct {
  37. sync.RWMutex
  38. buckets map[int64]bucket
  39. }
  40. func newExpirationMap() *expirationMap {
  41. return &expirationMap{
  42. buckets: make(map[int64]bucket),
  43. }
  44. }
  45. func (m *expirationMap) add(key, conflict uint64, expiration time.Time) {
  46. if m == nil {
  47. return
  48. }
  49. // Items that don't expire don't need to be in the expiration map.
  50. if expiration.IsZero() {
  51. return
  52. }
  53. bucketNum := storageBucket(expiration)
  54. m.Lock()
  55. defer m.Unlock()
  56. b, ok := m.buckets[bucketNum]
  57. if !ok {
  58. b = make(bucket)
  59. m.buckets[bucketNum] = b
  60. }
  61. b[key] = conflict
  62. }
  63. func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
  64. if m == nil {
  65. return
  66. }
  67. m.Lock()
  68. defer m.Unlock()
  69. oldBucketNum := storageBucket(oldExpTime)
  70. oldBucket, ok := m.buckets[oldBucketNum]
  71. if ok {
  72. delete(oldBucket, key)
  73. }
  74. newBucketNum := storageBucket(newExpTime)
  75. newBucket, ok := m.buckets[newBucketNum]
  76. if !ok {
  77. newBucket = make(bucket)
  78. m.buckets[newBucketNum] = newBucket
  79. }
  80. newBucket[key] = conflict
  81. }
  82. func (m *expirationMap) del(key uint64, expiration time.Time) {
  83. if m == nil {
  84. return
  85. }
  86. bucketNum := storageBucket(expiration)
  87. m.Lock()
  88. defer m.Unlock()
  89. _, ok := m.buckets[bucketNum]
  90. if !ok {
  91. return
  92. }
  93. delete(m.buckets[bucketNum], key)
  94. }
  95. // cleanup removes all the items in the bucket that was just completed. It deletes
  96. // those items from the store, and calls the onEvict function on those items.
  97. // This function is meant to be called periodically.
  98. func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback) {
  99. if m == nil {
  100. return
  101. }
  102. m.Lock()
  103. now := time.Now()
  104. bucketNum := cleanupBucket(now)
  105. keys := m.buckets[bucketNum]
  106. delete(m.buckets, bucketNum)
  107. m.Unlock()
  108. for key, conflict := range keys {
  109. // Sanity check. Verify that the store agrees that this key is expired.
  110. if store.Expiration(key).After(now) {
  111. continue
  112. }
  113. cost := policy.Cost(key)
  114. policy.Del(key)
  115. _, value := store.Del(key, conflict)
  116. if onEvict != nil {
  117. onEvict(&Item{Key: key,
  118. Conflict: conflict,
  119. Value: value,
  120. Cost: cost,
  121. })
  122. }
  123. }
  124. }