tx.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. package bbolt
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "sort"
  7. "strings"
  8. "sync/atomic"
  9. "time"
  10. "unsafe"
  11. )
  12. // txid represents the internal transaction identifier.
  13. type txid uint64
  14. // Tx represents a read-only or read/write transaction on the database.
  15. // Read-only transactions can be used for retrieving values for keys and creating cursors.
  16. // Read/write transactions can create and remove buckets and create and remove keys.
  17. //
  18. // IMPORTANT: You must commit or rollback transactions when you are done with
  19. // them. Pages can not be reclaimed by the writer until no more transactions
  20. // are using them. A long running read transaction can cause the database to
  21. // quickly grow.
  22. type Tx struct {
  23. writable bool
  24. managed bool
  25. db *DB
  26. meta *meta
  27. root Bucket
  28. pages map[pgid]*page
  29. stats TxStats
  30. commitHandlers []func()
  31. // WriteFlag specifies the flag for write-related methods like WriteTo().
  32. // Tx opens the database file with the specified flag to copy the data.
  33. //
  34. // By default, the flag is unset, which works well for mostly in-memory
  35. // workloads. For databases that are much larger than available RAM,
  36. // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
  37. WriteFlag int
  38. }
  39. // init initializes the transaction.
  40. func (tx *Tx) init(db *DB) {
  41. tx.db = db
  42. tx.pages = nil
  43. // Copy the meta page since it can be changed by the writer.
  44. tx.meta = &meta{}
  45. db.meta().copy(tx.meta)
  46. // Copy over the root bucket.
  47. tx.root = newBucket(tx)
  48. tx.root.bucket = &bucket{}
  49. *tx.root.bucket = tx.meta.root
  50. // Increment the transaction id and add a page cache for writable transactions.
  51. if tx.writable {
  52. tx.pages = make(map[pgid]*page)
  53. tx.meta.txid += txid(1)
  54. }
  55. }
  56. // ID returns the transaction id.
  57. func (tx *Tx) ID() int {
  58. return int(tx.meta.txid)
  59. }
  60. // DB returns a reference to the database that created the transaction.
  61. func (tx *Tx) DB() *DB {
  62. return tx.db
  63. }
  64. // Size returns current database size in bytes as seen by this transaction.
  65. func (tx *Tx) Size() int64 {
  66. return int64(tx.meta.pgid) * int64(tx.db.pageSize)
  67. }
  68. // Writable returns whether the transaction can perform write operations.
  69. func (tx *Tx) Writable() bool {
  70. return tx.writable
  71. }
  72. // Cursor creates a cursor associated with the root bucket.
  73. // All items in the cursor will return a nil value because all root bucket keys point to buckets.
  74. // The cursor is only valid as long as the transaction is open.
  75. // Do not use a cursor after the transaction is closed.
  76. func (tx *Tx) Cursor() *Cursor {
  77. return tx.root.Cursor()
  78. }
  79. // Stats retrieves a copy of the current transaction statistics.
  80. func (tx *Tx) Stats() TxStats {
  81. return tx.stats
  82. }
  83. // Bucket retrieves a bucket by name.
  84. // Returns nil if the bucket does not exist.
  85. // The bucket instance is only valid for the lifetime of the transaction.
  86. func (tx *Tx) Bucket(name []byte) *Bucket {
  87. return tx.root.Bucket(name)
  88. }
  89. // CreateBucket creates a new bucket.
  90. // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
  91. // The bucket instance is only valid for the lifetime of the transaction.
  92. func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
  93. return tx.root.CreateBucket(name)
  94. }
  95. // CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
  96. // Returns an error if the bucket name is blank, or if the bucket name is too long.
  97. // The bucket instance is only valid for the lifetime of the transaction.
  98. func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
  99. return tx.root.CreateBucketIfNotExists(name)
  100. }
  101. // DeleteBucket deletes a bucket.
  102. // Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
  103. func (tx *Tx) DeleteBucket(name []byte) error {
  104. return tx.root.DeleteBucket(name)
  105. }
  106. // ForEach executes a function for each bucket in the root.
  107. // If the provided function returns an error then the iteration is stopped and
  108. // the error is returned to the caller.
  109. func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
  110. return tx.root.ForEach(func(k, v []byte) error {
  111. return fn(k, tx.root.Bucket(k))
  112. })
  113. }
  114. // OnCommit adds a handler function to be executed after the transaction successfully commits.
  115. func (tx *Tx) OnCommit(fn func()) {
  116. tx.commitHandlers = append(tx.commitHandlers, fn)
  117. }
  118. // Commit writes all changes to disk and updates the meta page.
  119. // Returns an error if a disk write error occurs, or if Commit is
  120. // called on a read-only transaction.
  121. func (tx *Tx) Commit() error {
  122. _assert(!tx.managed, "managed tx commit not allowed")
  123. if tx.db == nil {
  124. return ErrTxClosed
  125. } else if !tx.writable {
  126. return ErrTxNotWritable
  127. }
  128. // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
  129. // Rebalance nodes which have had deletions.
  130. var startTime = time.Now()
  131. tx.root.rebalance()
  132. if tx.stats.GetRebalance() > 0 {
  133. tx.stats.IncRebalanceTime(time.Since(startTime))
  134. }
  135. opgid := tx.meta.pgid
  136. // spill data onto dirty pages.
  137. startTime = time.Now()
  138. if err := tx.root.spill(); err != nil {
  139. tx.rollback()
  140. return err
  141. }
  142. tx.stats.IncSpillTime(time.Since(startTime))
  143. // Free the old root bucket.
  144. tx.meta.root.root = tx.root.root
  145. // Free the old freelist because commit writes out a fresh freelist.
  146. if tx.meta.freelist != pgidNoFreelist {
  147. tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
  148. }
  149. if !tx.db.NoFreelistSync {
  150. err := tx.commitFreelist()
  151. if err != nil {
  152. return err
  153. }
  154. } else {
  155. tx.meta.freelist = pgidNoFreelist
  156. }
  157. // If the high water mark has moved up then attempt to grow the database.
  158. if tx.meta.pgid > opgid {
  159. if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
  160. tx.rollback()
  161. return err
  162. }
  163. }
  164. // Write dirty pages to disk.
  165. startTime = time.Now()
  166. if err := tx.write(); err != nil {
  167. tx.rollback()
  168. return err
  169. }
  170. // If strict mode is enabled then perform a consistency check.
  171. if tx.db.StrictMode {
  172. ch := tx.Check()
  173. var errs []string
  174. for {
  175. err, ok := <-ch
  176. if !ok {
  177. break
  178. }
  179. errs = append(errs, err.Error())
  180. }
  181. if len(errs) > 0 {
  182. panic("check fail: " + strings.Join(errs, "\n"))
  183. }
  184. }
  185. // Write meta to disk.
  186. if err := tx.writeMeta(); err != nil {
  187. tx.rollback()
  188. return err
  189. }
  190. tx.stats.IncWriteTime(time.Since(startTime))
  191. // Finalize the transaction.
  192. tx.close()
  193. // Execute commit handlers now that the locks have been removed.
  194. for _, fn := range tx.commitHandlers {
  195. fn()
  196. }
  197. return nil
  198. }
  199. func (tx *Tx) commitFreelist() error {
  200. // Allocate new pages for the new free list. This will overestimate
  201. // the size of the freelist but not underestimate the size (which would be bad).
  202. p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
  203. if err != nil {
  204. tx.rollback()
  205. return err
  206. }
  207. if err := tx.db.freelist.write(p); err != nil {
  208. tx.rollback()
  209. return err
  210. }
  211. tx.meta.freelist = p.id
  212. return nil
  213. }
  214. // Rollback closes the transaction and ignores all previous updates. Read-only
  215. // transactions must be rolled back and not committed.
  216. func (tx *Tx) Rollback() error {
  217. _assert(!tx.managed, "managed tx rollback not allowed")
  218. if tx.db == nil {
  219. return ErrTxClosed
  220. }
  221. tx.nonPhysicalRollback()
  222. return nil
  223. }
  224. // nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
  225. func (tx *Tx) nonPhysicalRollback() {
  226. if tx.db == nil {
  227. return
  228. }
  229. if tx.writable {
  230. tx.db.freelist.rollback(tx.meta.txid)
  231. }
  232. tx.close()
  233. }
  234. // rollback needs to reload the free pages from disk in case some system error happens like fsync error.
  235. func (tx *Tx) rollback() {
  236. if tx.db == nil {
  237. return
  238. }
  239. if tx.writable {
  240. tx.db.freelist.rollback(tx.meta.txid)
  241. // When mmap fails, the `data`, `dataref` and `datasz` may be reset to
  242. // zero values, and there is no way to reload free page IDs in this case.
  243. if tx.db.data != nil {
  244. if !tx.db.hasSyncedFreelist() {
  245. // Reconstruct free page list by scanning the DB to get the whole free page list.
  246. // Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
  247. tx.db.freelist.noSyncReload(tx.db.freepages())
  248. } else {
  249. // Read free page list from freelist page.
  250. tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
  251. }
  252. }
  253. }
  254. tx.close()
  255. }
  256. func (tx *Tx) close() {
  257. if tx.db == nil {
  258. return
  259. }
  260. if tx.writable {
  261. // Grab freelist stats.
  262. var freelistFreeN = tx.db.freelist.free_count()
  263. var freelistPendingN = tx.db.freelist.pending_count()
  264. var freelistAlloc = tx.db.freelist.size()
  265. // Remove transaction ref & writer lock.
  266. tx.db.rwtx = nil
  267. tx.db.rwlock.Unlock()
  268. // Merge statistics.
  269. tx.db.statlock.Lock()
  270. tx.db.stats.FreePageN = freelistFreeN
  271. tx.db.stats.PendingPageN = freelistPendingN
  272. tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
  273. tx.db.stats.FreelistInuse = freelistAlloc
  274. tx.db.stats.TxStats.add(&tx.stats)
  275. tx.db.statlock.Unlock()
  276. } else {
  277. tx.db.removeTx(tx)
  278. }
  279. // Clear all references.
  280. tx.db = nil
  281. tx.meta = nil
  282. tx.root = Bucket{tx: tx}
  283. tx.pages = nil
  284. }
  285. // Copy writes the entire database to a writer.
  286. // This function exists for backwards compatibility.
  287. //
  288. // Deprecated; Use WriteTo() instead.
  289. func (tx *Tx) Copy(w io.Writer) error {
  290. _, err := tx.WriteTo(w)
  291. return err
  292. }
  293. // WriteTo writes the entire database to a writer.
  294. // If err == nil then exactly tx.Size() bytes will be written into the writer.
  295. func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
  296. // Attempt to open reader with WriteFlag
  297. f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
  298. if err != nil {
  299. return 0, err
  300. }
  301. defer func() {
  302. if cerr := f.Close(); err == nil {
  303. err = cerr
  304. }
  305. }()
  306. // Generate a meta page. We use the same page data for both meta pages.
  307. buf := make([]byte, tx.db.pageSize)
  308. page := (*page)(unsafe.Pointer(&buf[0]))
  309. page.flags = metaPageFlag
  310. *page.meta() = *tx.meta
  311. // Write meta 0.
  312. page.id = 0
  313. page.meta().checksum = page.meta().sum64()
  314. nn, err := w.Write(buf)
  315. n += int64(nn)
  316. if err != nil {
  317. return n, fmt.Errorf("meta 0 copy: %s", err)
  318. }
  319. // Write meta 1 with a lower transaction id.
  320. page.id = 1
  321. page.meta().txid -= 1
  322. page.meta().checksum = page.meta().sum64()
  323. nn, err = w.Write(buf)
  324. n += int64(nn)
  325. if err != nil {
  326. return n, fmt.Errorf("meta 1 copy: %s", err)
  327. }
  328. // Move past the meta pages in the file.
  329. if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
  330. return n, fmt.Errorf("seek: %s", err)
  331. }
  332. // Copy data pages.
  333. wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
  334. n += wn
  335. if err != nil {
  336. return n, err
  337. }
  338. return n, nil
  339. }
  340. // CopyFile copies the entire database to file at the given path.
  341. // A reader transaction is maintained during the copy so it is safe to continue
  342. // using the database while a copy is in progress.
  343. func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
  344. f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
  345. if err != nil {
  346. return err
  347. }
  348. _, err = tx.WriteTo(f)
  349. if err != nil {
  350. _ = f.Close()
  351. return err
  352. }
  353. return f.Close()
  354. }
  355. // allocate returns a contiguous block of memory starting at a given page.
  356. func (tx *Tx) allocate(count int) (*page, error) {
  357. p, err := tx.db.allocate(tx.meta.txid, count)
  358. if err != nil {
  359. return nil, err
  360. }
  361. // Save to our page cache.
  362. tx.pages[p.id] = p
  363. // Update statistics.
  364. tx.stats.IncPageCount(int64(count))
  365. tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
  366. return p, nil
  367. }
  368. // write writes any dirty pages to disk.
  369. func (tx *Tx) write() error {
  370. // Sort pages by id.
  371. pages := make(pages, 0, len(tx.pages))
  372. for _, p := range tx.pages {
  373. pages = append(pages, p)
  374. }
  375. // Clear out page cache early.
  376. tx.pages = make(map[pgid]*page)
  377. sort.Sort(pages)
  378. // Write pages to disk in order.
  379. for _, p := range pages {
  380. rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
  381. offset := int64(p.id) * int64(tx.db.pageSize)
  382. var written uintptr
  383. // Write out page in "max allocation" sized chunks.
  384. for {
  385. sz := rem
  386. if sz > maxAllocSize-1 {
  387. sz = maxAllocSize - 1
  388. }
  389. buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
  390. if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
  391. return err
  392. }
  393. // Update statistics.
  394. tx.stats.IncWrite(1)
  395. // Exit inner for loop if we've written all the chunks.
  396. rem -= sz
  397. if rem == 0 {
  398. break
  399. }
  400. // Otherwise move offset forward and move pointer to next chunk.
  401. offset += int64(sz)
  402. written += uintptr(sz)
  403. }
  404. }
  405. // Ignore file sync if flag is set on DB.
  406. if !tx.db.NoSync || IgnoreNoSync {
  407. if err := fdatasync(tx.db); err != nil {
  408. return err
  409. }
  410. }
  411. // Put small pages back to page pool.
  412. for _, p := range pages {
  413. // Ignore page sizes over 1 page.
  414. // These are allocated using make() instead of the page pool.
  415. if int(p.overflow) != 0 {
  416. continue
  417. }
  418. buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
  419. // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
  420. for i := range buf {
  421. buf[i] = 0
  422. }
  423. tx.db.pagePool.Put(buf) //nolint:staticcheck
  424. }
  425. return nil
  426. }
  427. // writeMeta writes the meta to the disk.
  428. func (tx *Tx) writeMeta() error {
  429. // Create a temporary buffer for the meta page.
  430. buf := make([]byte, tx.db.pageSize)
  431. p := tx.db.pageInBuffer(buf, 0)
  432. tx.meta.write(p)
  433. // Write the meta page to file.
  434. if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
  435. return err
  436. }
  437. if !tx.db.NoSync || IgnoreNoSync {
  438. if err := fdatasync(tx.db); err != nil {
  439. return err
  440. }
  441. }
  442. // Update statistics.
  443. tx.stats.IncWrite(1)
  444. return nil
  445. }
  446. // page returns a reference to the page with a given id.
  447. // If page has been written to then a temporary buffered page is returned.
  448. func (tx *Tx) page(id pgid) *page {
  449. // Check the dirty pages first.
  450. if tx.pages != nil {
  451. if p, ok := tx.pages[id]; ok {
  452. p.fastCheck(id)
  453. return p
  454. }
  455. }
  456. // Otherwise return directly from the mmap.
  457. p := tx.db.page(id)
  458. p.fastCheck(id)
  459. return p
  460. }
  461. // forEachPage iterates over every page within a given page and executes a function.
  462. func (tx *Tx) forEachPage(pgidnum pgid, fn func(*page, int, []pgid)) {
  463. stack := make([]pgid, 10)
  464. stack[0] = pgidnum
  465. tx.forEachPageInternal(stack[:1], fn)
  466. }
  467. func (tx *Tx) forEachPageInternal(pgidstack []pgid, fn func(*page, int, []pgid)) {
  468. p := tx.page(pgidstack[len(pgidstack)-1])
  469. // Execute function.
  470. fn(p, len(pgidstack)-1, pgidstack)
  471. // Recursively loop over children.
  472. if (p.flags & branchPageFlag) != 0 {
  473. for i := 0; i < int(p.count); i++ {
  474. elem := p.branchPageElement(uint16(i))
  475. tx.forEachPageInternal(append(pgidstack, elem.pgid), fn)
  476. }
  477. }
  478. }
  479. // Page returns page information for a given page number.
  480. // This is only safe for concurrent use when used by a writable transaction.
  481. func (tx *Tx) Page(id int) (*PageInfo, error) {
  482. if tx.db == nil {
  483. return nil, ErrTxClosed
  484. } else if pgid(id) >= tx.meta.pgid {
  485. return nil, nil
  486. }
  487. if tx.db.freelist == nil {
  488. return nil, ErrFreePagesNotLoaded
  489. }
  490. // Build the page info.
  491. p := tx.db.page(pgid(id))
  492. info := &PageInfo{
  493. ID: id,
  494. Count: int(p.count),
  495. OverflowCount: int(p.overflow),
  496. }
  497. // Determine the type (or if it's free).
  498. if tx.db.freelist.freed(pgid(id)) {
  499. info.Type = "free"
  500. } else {
  501. info.Type = p.typ()
  502. }
  503. return info, nil
  504. }
  505. // TxStats represents statistics about the actions performed by the transaction.
  506. type TxStats struct {
  507. // Page statistics.
  508. //
  509. // DEPRECATED: Use GetPageCount() or IncPageCount()
  510. PageCount int64 // number of page allocations
  511. // DEPRECATED: Use GetPageAlloc() or IncPageAlloc()
  512. PageAlloc int64 // total bytes allocated
  513. // Cursor statistics.
  514. //
  515. // DEPRECATED: Use GetCursorCount() or IncCursorCount()
  516. CursorCount int64 // number of cursors created
  517. // Node statistics
  518. //
  519. // DEPRECATED: Use GetNodeCount() or IncNodeCount()
  520. NodeCount int64 // number of node allocations
  521. // DEPRECATED: Use GetNodeDeref() or IncNodeDeref()
  522. NodeDeref int64 // number of node dereferences
  523. // Rebalance statistics.
  524. //
  525. // DEPRECATED: Use GetRebalance() or IncRebalance()
  526. Rebalance int64 // number of node rebalances
  527. // DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime()
  528. RebalanceTime time.Duration // total time spent rebalancing
  529. // Split/Spill statistics.
  530. //
  531. // DEPRECATED: Use GetSplit() or IncSplit()
  532. Split int64 // number of nodes split
  533. // DEPRECATED: Use GetSpill() or IncSpill()
  534. Spill int64 // number of nodes spilled
  535. // DEPRECATED: Use GetSpillTime() or IncSpillTime()
  536. SpillTime time.Duration // total time spent spilling
  537. // Write statistics.
  538. //
  539. // DEPRECATED: Use GetWrite() or IncWrite()
  540. Write int64 // number of writes performed
  541. // DEPRECATED: Use GetWriteTime() or IncWriteTime()
  542. WriteTime time.Duration // total time spent writing to disk
  543. }
  544. func (s *TxStats) add(other *TxStats) {
  545. s.IncPageCount(other.GetPageCount())
  546. s.IncPageAlloc(other.GetPageAlloc())
  547. s.IncCursorCount(other.GetCursorCount())
  548. s.IncNodeCount(other.GetNodeCount())
  549. s.IncNodeDeref(other.GetNodeDeref())
  550. s.IncRebalance(other.GetRebalance())
  551. s.IncRebalanceTime(other.GetRebalanceTime())
  552. s.IncSplit(other.GetSplit())
  553. s.IncSpill(other.GetSpill())
  554. s.IncSpillTime(other.GetSpillTime())
  555. s.IncWrite(other.GetWrite())
  556. s.IncWriteTime(other.GetWriteTime())
  557. }
  558. // Sub calculates and returns the difference between two sets of transaction stats.
  559. // This is useful when obtaining stats at two different points and time and
  560. // you need the performance counters that occurred within that time span.
  561. func (s *TxStats) Sub(other *TxStats) TxStats {
  562. var diff TxStats
  563. diff.PageCount = s.GetPageCount() - other.GetPageCount()
  564. diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
  565. diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
  566. diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
  567. diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
  568. diff.Rebalance = s.GetRebalance() - other.GetRebalance()
  569. diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
  570. diff.Split = s.GetSplit() - other.GetSplit()
  571. diff.Spill = s.GetSpill() - other.GetSpill()
  572. diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
  573. diff.Write = s.GetWrite() - other.GetWrite()
  574. diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
  575. return diff
  576. }
  577. // GetPageCount returns PageCount atomically.
  578. func (s *TxStats) GetPageCount() int64 {
  579. return atomic.LoadInt64(&s.PageCount)
  580. }
  581. // IncPageCount increases PageCount atomically and returns the new value.
  582. func (s *TxStats) IncPageCount(delta int64) int64 {
  583. return atomic.AddInt64(&s.PageCount, delta)
  584. }
  585. // GetPageAlloc returns PageAlloc atomically.
  586. func (s *TxStats) GetPageAlloc() int64 {
  587. return atomic.LoadInt64(&s.PageAlloc)
  588. }
  589. // IncPageAlloc increases PageAlloc atomically and returns the new value.
  590. func (s *TxStats) IncPageAlloc(delta int64) int64 {
  591. return atomic.AddInt64(&s.PageAlloc, delta)
  592. }
  593. // GetCursorCount returns CursorCount atomically.
  594. func (s *TxStats) GetCursorCount() int64 {
  595. return atomic.LoadInt64(&s.CursorCount)
  596. }
  597. // IncCursorCount increases CursorCount atomically and return the new value.
  598. func (s *TxStats) IncCursorCount(delta int64) int64 {
  599. return atomic.AddInt64(&s.CursorCount, delta)
  600. }
  601. // GetNodeCount returns NodeCount atomically.
  602. func (s *TxStats) GetNodeCount() int64 {
  603. return atomic.LoadInt64(&s.NodeCount)
  604. }
  605. // IncNodeCount increases NodeCount atomically and returns the new value.
  606. func (s *TxStats) IncNodeCount(delta int64) int64 {
  607. return atomic.AddInt64(&s.NodeCount, delta)
  608. }
  609. // GetNodeDeref returns NodeDeref atomically.
  610. func (s *TxStats) GetNodeDeref() int64 {
  611. return atomic.LoadInt64(&s.NodeDeref)
  612. }
  613. // IncNodeDeref increases NodeDeref atomically and returns the new value.
  614. func (s *TxStats) IncNodeDeref(delta int64) int64 {
  615. return atomic.AddInt64(&s.NodeDeref, delta)
  616. }
  617. // GetRebalance returns Rebalance atomically.
  618. func (s *TxStats) GetRebalance() int64 {
  619. return atomic.LoadInt64(&s.Rebalance)
  620. }
  621. // IncRebalance increases Rebalance atomically and returns the new value.
  622. func (s *TxStats) IncRebalance(delta int64) int64 {
  623. return atomic.AddInt64(&s.Rebalance, delta)
  624. }
  625. // GetRebalanceTime returns RebalanceTime atomically.
  626. func (s *TxStats) GetRebalanceTime() time.Duration {
  627. return atomicLoadDuration(&s.RebalanceTime)
  628. }
  629. // IncRebalanceTime increases RebalanceTime atomically and returns the new value.
  630. func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
  631. return atomicAddDuration(&s.RebalanceTime, delta)
  632. }
  633. // GetSplit returns Split atomically.
  634. func (s *TxStats) GetSplit() int64 {
  635. return atomic.LoadInt64(&s.Split)
  636. }
  637. // IncSplit increases Split atomically and returns the new value.
  638. func (s *TxStats) IncSplit(delta int64) int64 {
  639. return atomic.AddInt64(&s.Split, delta)
  640. }
  641. // GetSpill returns Spill atomically.
  642. func (s *TxStats) GetSpill() int64 {
  643. return atomic.LoadInt64(&s.Spill)
  644. }
  645. // IncSpill increases Spill atomically and returns the new value.
  646. func (s *TxStats) IncSpill(delta int64) int64 {
  647. return atomic.AddInt64(&s.Spill, delta)
  648. }
  649. // GetSpillTime returns SpillTime atomically.
  650. func (s *TxStats) GetSpillTime() time.Duration {
  651. return atomicLoadDuration(&s.SpillTime)
  652. }
  653. // IncSpillTime increases SpillTime atomically and returns the new value.
  654. func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
  655. return atomicAddDuration(&s.SpillTime, delta)
  656. }
  657. // GetWrite returns Write atomically.
  658. func (s *TxStats) GetWrite() int64 {
  659. return atomic.LoadInt64(&s.Write)
  660. }
  661. // IncWrite increases Write atomically and returns the new value.
  662. func (s *TxStats) IncWrite(delta int64) int64 {
  663. return atomic.AddInt64(&s.Write, delta)
  664. }
  665. // GetWriteTime returns WriteTime atomically.
  666. func (s *TxStats) GetWriteTime() time.Duration {
  667. return atomicLoadDuration(&s.WriteTime)
  668. }
  669. // IncWriteTime increases WriteTime atomically and returns the new value.
  670. func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
  671. return atomicAddDuration(&s.WriteTime, delta)
  672. }
  673. func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
  674. return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
  675. }
  676. func atomicLoadDuration(ptr *time.Duration) time.Duration {
  677. return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
  678. }