inmemory.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lockman
  15. import (
  16. "context"
  17. "runtime/debug"
  18. "sync"
  19. "github.com/petermattis/goid"
  20. "yunion.io/x/log"
  21. )
  22. var (
  23. debug_log = false
  24. )
  25. /*type SInMemoryLockOwner struct {
  26. owner context.Context
  27. }*/
  28. type SInMemoryLockRecord struct {
  29. key string
  30. lock *sync.Mutex
  31. cond *sync.Cond
  32. holder int64
  33. depth int
  34. waiter *FIFO
  35. }
  36. func newInMemoryLockRecord(ctxDummy context.Context) *SInMemoryLockRecord {
  37. lock := &sync.Mutex{}
  38. cond := sync.NewCond(lock)
  39. rec := SInMemoryLockRecord{lock: lock, cond: cond, holder: -1, depth: 0, waiter: NewFIFO()}
  40. return &rec
  41. }
  42. func (rec *SInMemoryLockRecord) fatalf(fmtStr string, args ...interface{}) {
  43. debug.PrintStack()
  44. log.Fatalf(fmtStr, args...)
  45. }
  46. func (rec *SInMemoryLockRecord) lockContext(ctxDummy context.Context) {
  47. rec.lock.Lock()
  48. defer rec.lock.Unlock()
  49. curGoid := goid.Get()
  50. if rec.holder < 0 {
  51. if debug_log {
  52. log.Debugf("lockContext: curGoid=[%d] key=[%s] create new record", curGoid, rec.key)
  53. }
  54. rec.holder = curGoid
  55. rec.depth = 1
  56. return
  57. }
  58. if debug_log {
  59. log.Debugf("rec.hold=[%d] ctx=[%d] %v key=[%s]", rec.holder, curGoid, rec.holder == curGoid, rec.key)
  60. }
  61. if rec.holder == curGoid {
  62. rec.depth += 1
  63. if debug_log {
  64. log.Infof("lockContext: same ctx, depth: %d holder=[%d] ctx=[%d] key=[%s]", rec.depth, rec.holder, curGoid, rec.key)
  65. }
  66. if rec.depth > 32 {
  67. // XXX MUST BE BUG ???
  68. rec.fatalf("Too many recursive locks!!! key=[%s]", rec.key)
  69. }
  70. return
  71. }
  72. // check
  73. rec.waiter.Enum(func(ele interface{}) {
  74. electx := ele.(int64)
  75. if electx == curGoid {
  76. rec.fatalf("try to lock from a waiter context???? curGoid=[%d] waiterGoid=[%d] key=[%s]", curGoid, electx, rec.key)
  77. }
  78. })
  79. rec.waiter.Push(curGoid)
  80. if debug_log {
  81. log.Debugf("waiter size %d after push curGoid=[%d]", rec.waiter.Len(), curGoid)
  82. log.Debugf("Start to wait ... holder=[%d] curGoid [%d] key=[%s]", rec.holder, curGoid, rec.key)
  83. }
  84. for rec.holder >= 0 {
  85. rec.cond.Wait()
  86. }
  87. if debug_log {
  88. log.Debugf("End of wait ... holder=[%d] curGoid [%d] key=[%s]", rec.holder, curGoid, rec.key)
  89. }
  90. rec.waiter.Pop(curGoid)
  91. if debug_log {
  92. log.Debugf("waiter size %d after pop curGoid=[%d] key=[%s]", rec.waiter.Len(), curGoid, rec.key)
  93. }
  94. rec.holder = curGoid
  95. rec.depth = 1
  96. }
  97. func (rec *SInMemoryLockRecord) unlockContext(ctxDummy context.Context) (needClean bool) {
  98. rec.lock.Lock()
  99. defer rec.lock.Unlock()
  100. curGoid := goid.Get()
  101. if rec.holder != curGoid {
  102. rec.fatalf("try to unlock a wait context??? key=[%s] holder=[%d] curGoid=[%d]", rec.key, rec.holder, curGoid)
  103. }
  104. if debug_log {
  105. log.Debugf("unlockContext depth %d curGoid=[%d] key=[%s]", rec.depth, curGoid, rec.key)
  106. }
  107. rec.depth -= 1
  108. if rec.depth <= 0 {
  109. if debug_log {
  110. log.Debugf("depth 0, to release lock for context curGoid=[%d] key=[%s]", curGoid, rec.key)
  111. }
  112. rec.holder = -1
  113. if rec.waiter.Len() == 0 {
  114. return true
  115. }
  116. rec.cond.Signal()
  117. }
  118. return false
  119. }
  120. type SInMemoryLockManager struct {
  121. *SBaseLockManager
  122. tableLock *sync.Mutex
  123. lockTable map[string]*SInMemoryLockRecord
  124. }
  125. func NewInMemoryLockManager() ILockManager {
  126. lockMan := SInMemoryLockManager{
  127. tableLock: &sync.Mutex{},
  128. lockTable: make(map[string]*SInMemoryLockRecord),
  129. }
  130. lockMan.SBaseLockManager = NewBaseLockManger(&lockMan)
  131. return &lockMan
  132. }
  133. func (lockman *SInMemoryLockManager) getRecordWithLock(ctx context.Context, key string, new bool) *SInMemoryLockRecord {
  134. lockman.tableLock.Lock()
  135. defer lockman.tableLock.Unlock()
  136. return lockman.getRecord(ctx, key, new)
  137. }
  138. func (lockman *SInMemoryLockManager) getRecord(ctx context.Context, key string, new bool) *SInMemoryLockRecord {
  139. _, ok := lockman.lockTable[key]
  140. if !ok {
  141. if !new {
  142. return nil
  143. }
  144. rec := newInMemoryLockRecord(ctx)
  145. rec.key = key
  146. lockman.lockTable[key] = rec
  147. }
  148. return lockman.lockTable[key]
  149. }
  150. func (lockman *SInMemoryLockManager) LockKey(ctx context.Context, key string) {
  151. record := lockman.getRecordWithLock(ctx, key, true)
  152. record.lockContext(ctx)
  153. }
  154. func (lockman *SInMemoryLockManager) UnlockKey(ctx context.Context, key string) {
  155. record := lockman.getRecordWithLock(ctx, key, false)
  156. if record == nil {
  157. log.Errorf("BUG: unlock an non-existent lock ctx: %p key: %s\n%s", ctx, key, debug.Stack())
  158. return
  159. }
  160. record.unlockContext(ctx)
  161. }