tx_check.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package bbolt
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. )
  6. // Check performs several consistency checks on the database for this transaction.
  7. // An error is returned if any inconsistency is found.
  8. //
  9. // It can be safely run concurrently on a writable transaction. However, this
  10. // incurs a high cost for large databases and databases with a lot of subbuckets
  11. // because of caching. This overhead can be removed if running on a read-only
  12. // transaction, however, it is not safe to execute other writer transactions at
  13. // the same time.
  14. func (tx *Tx) Check() <-chan error {
  15. return tx.CheckWithOptions()
  16. }
  17. // CheckWithOptions allows users to provide a customized `KVStringer` implementation,
  18. // so that bolt can generate human-readable diagnostic messages.
  19. func (tx *Tx) CheckWithOptions(options ...CheckOption) <-chan error {
  20. chkConfig := checkConfig{
  21. kvStringer: HexKVStringer(),
  22. }
  23. for _, op := range options {
  24. op(&chkConfig)
  25. }
  26. ch := make(chan error)
  27. go tx.check(chkConfig.kvStringer, ch)
  28. return ch
  29. }
  30. func (tx *Tx) check(kvStringer KVStringer, ch chan error) {
  31. // Force loading free list if opened in ReadOnly mode.
  32. tx.db.loadFreelist()
  33. // Check if any pages are double freed.
  34. freed := make(map[pgid]bool)
  35. all := make([]pgid, tx.db.freelist.count())
  36. tx.db.freelist.copyall(all)
  37. for _, id := range all {
  38. if freed[id] {
  39. ch <- fmt.Errorf("page %d: already freed", id)
  40. }
  41. freed[id] = true
  42. }
  43. // Track every reachable page.
  44. reachable := make(map[pgid]*page)
  45. reachable[0] = tx.page(0) // meta0
  46. reachable[1] = tx.page(1) // meta1
  47. if tx.meta.freelist != pgidNoFreelist {
  48. for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
  49. reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
  50. }
  51. }
  52. // Recursively check buckets.
  53. tx.checkBucket(&tx.root, reachable, freed, kvStringer, ch)
  54. // Ensure all pages below high water mark are either reachable or freed.
  55. for i := pgid(0); i < tx.meta.pgid; i++ {
  56. _, isReachable := reachable[i]
  57. if !isReachable && !freed[i] {
  58. ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
  59. }
  60. }
  61. // Close the channel to signal completion.
  62. close(ch)
  63. }
  64. func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool,
  65. kvStringer KVStringer, ch chan error) {
  66. // Ignore inline buckets.
  67. if b.root == 0 {
  68. return
  69. }
  70. // Check every page used by this bucket.
  71. b.tx.forEachPage(b.root, func(p *page, _ int, stack []pgid) {
  72. if p.id > tx.meta.pgid {
  73. ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.id), int(b.tx.meta.pgid), stack)
  74. }
  75. // Ensure each page is only referenced once.
  76. for i := pgid(0); i <= pgid(p.overflow); i++ {
  77. var id = p.id + i
  78. if _, ok := reachable[id]; ok {
  79. ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack)
  80. }
  81. reachable[id] = p
  82. }
  83. // We should only encounter un-freed leaf and branch pages.
  84. if freed[p.id] {
  85. ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
  86. } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
  87. ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.id), p.typ(), stack)
  88. }
  89. })
  90. tx.recursivelyCheckPages(b.root, kvStringer.KeyToString, ch)
  91. // Check each bucket within this bucket.
  92. _ = b.ForEachBucket(func(k []byte) error {
  93. if child := b.Bucket(k); child != nil {
  94. tx.checkBucket(child, reachable, freed, kvStringer, ch)
  95. }
  96. return nil
  97. })
  98. }
  99. // recursivelyCheckPages confirms database consistency with respect to b-tree
  100. // key order constraints:
  101. // - keys on pages must be sorted
  102. // - keys on children pages are between 2 consecutive keys on the parent's branch page).
  103. func (tx *Tx) recursivelyCheckPages(pgId pgid, keyToString func([]byte) string, ch chan error) {
  104. tx.recursivelyCheckPagesInternal(pgId, nil, nil, nil, keyToString, ch)
  105. }
  106. // recursivelyCheckPagesInternal verifies that all keys in the subtree rooted at `pgid` are:
  107. // - >=`minKeyClosed` (can be nil)
  108. // - <`maxKeyOpen` (can be nil)
  109. // - Are in right ordering relationship to their parents.
  110. // `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message.
  111. func (tx *Tx) recursivelyCheckPagesInternal(
  112. pgId pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []pgid,
  113. keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) {
  114. p := tx.page(pgId)
  115. pagesStack = append(pagesStack, pgId)
  116. switch {
  117. case p.flags&branchPageFlag != 0:
  118. // For branch page we navigate ranges of all subpages.
  119. runningMin := minKeyClosed
  120. for i := range p.branchPageElements() {
  121. elem := p.branchPageElement(uint16(i))
  122. verifyKeyOrder(elem.pgid, "branch", i, elem.key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
  123. maxKey := maxKeyOpen
  124. if i < len(p.branchPageElements())-1 {
  125. maxKey = p.branchPageElement(uint16(i + 1)).key()
  126. }
  127. maxKeyInSubtree = tx.recursivelyCheckPagesInternal(elem.pgid, elem.key(), maxKey, pagesStack, keyToString, ch)
  128. runningMin = maxKeyInSubtree
  129. }
  130. return maxKeyInSubtree
  131. case p.flags&leafPageFlag != 0:
  132. runningMin := minKeyClosed
  133. for i := range p.leafPageElements() {
  134. elem := p.leafPageElement(uint16(i))
  135. verifyKeyOrder(pgId, "leaf", i, elem.key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
  136. runningMin = elem.key()
  137. }
  138. if p.count > 0 {
  139. return p.leafPageElement(p.count - 1).key()
  140. }
  141. default:
  142. ch <- fmt.Errorf("unexpected page type for pgId:%d", pgId)
  143. }
  144. return maxKeyInSubtree
  145. }
  146. /***
  147. * verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
  148. * is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
  149. */
  150. func verifyKeyOrder(pgId pgid, pageType string, index int, key []byte, previousKey []byte, maxKeyOpen []byte, ch chan error, keyToString func([]byte) string, pagesStack []pgid) {
  151. if index == 0 && previousKey != nil && compareKeys(previousKey, key) > 0 {
  152. ch <- fmt.Errorf("the first key[%d]=(hex)%s on %s page(%d) needs to be >= the key in the ancestor (%s). Stack: %v",
  153. index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
  154. }
  155. if index > 0 {
  156. cmpRet := compareKeys(previousKey, key)
  157. if cmpRet > 0 {
  158. ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found <) than previous element (hex)%s. Stack: %v",
  159. index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
  160. }
  161. if cmpRet == 0 {
  162. ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found =) than previous element (hex)%s. Stack: %v",
  163. index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
  164. }
  165. }
  166. if maxKeyOpen != nil && compareKeys(key, maxKeyOpen) >= 0 {
  167. ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be < than key of the next element in ancestor (hex)%s. Pages stack: %v",
  168. index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
  169. }
  170. }
  171. // ===========================================================================================
  172. type checkConfig struct {
  173. kvStringer KVStringer
  174. }
  175. type CheckOption func(options *checkConfig)
  176. func WithKVStringer(kvStringer KVStringer) CheckOption {
  177. return func(c *checkConfig) {
  178. c.kvStringer = kvStringer
  179. }
  180. }
  181. // KVStringer allows to prepare human-readable diagnostic messages.
  182. type KVStringer interface {
  183. KeyToString([]byte) string
  184. ValueToString([]byte) string
  185. }
  186. // HexKVStringer serializes both key & value to hex representation.
  187. func HexKVStringer() KVStringer {
  188. return hexKvStringer{}
  189. }
  190. type hexKvStringer struct{}
  191. func (_ hexKvStringer) KeyToString(key []byte) string {
  192. return hex.EncodeToString(key)
  193. }
  194. func (_ hexKvStringer) ValueToString(value []byte) string {
  195. return hex.EncodeToString(value)
  196. }