compact.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package bbolt
  2. // Compact will create a copy of the source DB and in the destination DB. This may
  3. // reclaim space that the source database no longer has use for. txMaxSize can be
  4. // used to limit the transactions size of this process and may trigger intermittent
  5. // commits. A value of zero will ignore transaction sizes.
  6. // TODO: merge with: https://github.com/etcd-io/etcd/blob/b7f0f52a16dbf83f18ca1d803f7892d750366a94/mvcc/backend/backend.go#L349
  7. func Compact(dst, src *DB, txMaxSize int64) error {
  8. // commit regularly, or we'll run out of memory for large datasets if using one transaction.
  9. var size int64
  10. tx, err := dst.Begin(true)
  11. if err != nil {
  12. return err
  13. }
  14. defer func() {
  15. if tempErr := tx.Rollback(); tempErr != nil {
  16. err = tempErr
  17. }
  18. }()
  19. if err := walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
  20. // On each key/value, check if we have exceeded tx size.
  21. sz := int64(len(k) + len(v))
  22. if size+sz > txMaxSize && txMaxSize != 0 {
  23. // Commit previous transaction.
  24. if err := tx.Commit(); err != nil {
  25. return err
  26. }
  27. // Start new transaction.
  28. tx, err = dst.Begin(true)
  29. if err != nil {
  30. return err
  31. }
  32. size = 0
  33. }
  34. size += sz
  35. // Create bucket on the root transaction if this is the first level.
  36. nk := len(keys)
  37. if nk == 0 {
  38. bkt, err := tx.CreateBucket(k)
  39. if err != nil {
  40. return err
  41. }
  42. if err := bkt.SetSequence(seq); err != nil {
  43. return err
  44. }
  45. return nil
  46. }
  47. // Create buckets on subsequent levels, if necessary.
  48. b := tx.Bucket(keys[0])
  49. if nk > 1 {
  50. for _, k := range keys[1:] {
  51. b = b.Bucket(k)
  52. }
  53. }
  54. // Fill the entire page for best compaction.
  55. b.FillPercent = 1.0
  56. // If there is no value then this is a bucket call.
  57. if v == nil {
  58. bkt, err := b.CreateBucket(k)
  59. if err != nil {
  60. return err
  61. }
  62. if err := bkt.SetSequence(seq); err != nil {
  63. return err
  64. }
  65. return nil
  66. }
  67. // Otherwise treat it as a key/value pair.
  68. return b.Put(k, v)
  69. }); err != nil {
  70. return err
  71. }
  72. err = tx.Commit()
  73. return err
  74. }
  75. // walkFunc is the type of the function called for keys (buckets and "normal"
  76. // values) discovered by Walk. keys is the list of keys to descend to the bucket
  77. // owning the discovered key/value pair k/v.
  78. type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
  79. // walk walks recursively the bolt database db, calling walkFn for each key it finds.
  80. func walk(db *DB, walkFn walkFunc) error {
  81. return db.View(func(tx *Tx) error {
  82. return tx.ForEach(func(name []byte, b *Bucket) error {
  83. return walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
  84. })
  85. })
  86. }
  87. func walkBucket(b *Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
  88. // Execute callback.
  89. if err := fn(keypath, k, v, seq); err != nil {
  90. return err
  91. }
  92. // If this is not a bucket then stop.
  93. if v != nil {
  94. return nil
  95. }
  96. // Iterate over each child key/value.
  97. keypath = append(keypath, k)
  98. return b.ForEach(func(k, v []byte) error {
  99. if v == nil {
  100. bkt := b.Bucket(k)
  101. return walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
  102. }
  103. return walkBucket(b, keypath, k, v, b.Sequence(), fn)
  104. })
  105. }