etcd.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lockman
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "runtime/debug"
  20. "strings"
  21. "sync"
  22. "time"
  23. clientv3 "go.etcd.io/etcd/client/v3"
  24. "go.etcd.io/etcd/client/v3/concurrency"
  25. "google.golang.org/grpc"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/onecloud/pkg/util/atexit"
  29. )
  30. type SEtcdLockRecord struct {
  31. m *sync.Mutex
  32. depth int
  33. pfx string
  34. sess *concurrency.Session
  35. em *concurrency.Mutex
  36. }
  37. func newEtcdLockRecord(ctx context.Context, lockman *SEtcdLockManager, key string) *SEtcdLockRecord {
  38. pfx := fmt.Sprintf("%s/%s", lockman.config.LockPrefix, key)
  39. sess, err := concurrency.NewSession(lockman.cli, concurrency.WithTTL(lockman.config.LockTTL))
  40. if err != nil {
  41. panic(fmt.Sprintf("%s: create etcd session: %v", pfx, err))
  42. }
  43. em := concurrency.NewMutex(sess, pfx)
  44. rec := SEtcdLockRecord{
  45. m: &sync.Mutex{},
  46. depth: 0,
  47. pfx: pfx,
  48. sess: sess,
  49. em: em,
  50. }
  51. return &rec
  52. }
  53. func (rec *SEtcdLockRecord) lockContext(ctx context.Context) {
  54. rec.m.Lock()
  55. defer rec.m.Unlock()
  56. rec.depth += 1
  57. if rec.depth > 32 {
  58. // NOTE callers are responsible for ensuring unlock got called
  59. bug("%s: depth > 32", rec.pfx)
  60. panic(debug.Stack())
  61. }
  62. if err := rec.em.Lock(ctx); err != nil {
  63. msg := fmt.Sprintf("%s: etcd lock: %v", rec.pfx, err)
  64. panic(msg)
  65. }
  66. if debug_log {
  67. log.Infof("%s: lock depth %d\n%s", rec.pfx, rec.depth, debug.Stack())
  68. }
  69. }
  70. func (rec *SEtcdLockRecord) unlockContext(ctx context.Context) (needClean bool) {
  71. rec.m.Lock()
  72. defer rec.m.Unlock()
  73. if debug_log {
  74. log.Infof("%s: unlock depth %d\n%s", rec.pfx, rec.depth, debug.Stack())
  75. }
  76. rec.depth -= 1
  77. if rec.depth <= 0 {
  78. if rec.depth < 0 {
  79. bug("%s: overly unlocked", rec.pfx)
  80. }
  81. // Other players can make progress once lock record got removed
  82. // after this
  83. //
  84. // There is no need to unlock rec.em. Revoke the session lease
  85. // will trigger etcd to do that. Should the remote revoke call
  86. // fail, we expect the lease auto-refresh to stop and the lease
  87. // will expire in known time limit.
  88. closed := false
  89. for i := 0; i < 3; i++ {
  90. if err := rec.sess.Close(); err != nil {
  91. log.Errorf("%s: session close: %s", rec.pfx, err)
  92. if i < 2 {
  93. time.Sleep(time.Second)
  94. }
  95. continue
  96. }
  97. closed = true
  98. break
  99. }
  100. if !closed {
  101. log.Errorf("%s: session close failure", rec.pfx)
  102. }
  103. return true
  104. }
  105. return false
  106. }
  107. type SEtcdLockManagerConfig struct {
  108. Endpoints []string
  109. Username string
  110. Password string
  111. TLS *tls.Config
  112. LockTTL int
  113. LockPrefix string
  114. dialOptions []grpc.DialOption
  115. dialTimeout time.Duration
  116. }
  117. func (config *SEtcdLockManagerConfig) validate() error {
  118. if len(config.Endpoints) == 0 {
  119. return fmt.Errorf("no etcd endpoint configured")
  120. }
  121. if config.dialTimeout <= 0 {
  122. config.dialTimeout = 3 * time.Second
  123. }
  124. if len(config.dialOptions) == 0 {
  125. // let it fail right away
  126. config.dialOptions = []grpc.DialOption{
  127. grpc.WithBlock(),
  128. grpc.WithTimeout(500 * time.Millisecond),
  129. }
  130. }
  131. if config.LockTTL <= 0 {
  132. config.LockTTL = 10
  133. }
  134. config.LockPrefix = strings.TrimSpace(config.LockPrefix)
  135. config.LockPrefix = strings.TrimRight(config.LockPrefix, "/")
  136. if config.LockPrefix == "" {
  137. return fmt.Errorf("empty etcd lock prefix")
  138. }
  139. return nil
  140. }
  141. type SLockTableIndex struct {
  142. key string
  143. holder context.Context
  144. }
  145. type SEtcdLockManager struct {
  146. *SBaseLockManager
  147. tableLock *sync.Mutex
  148. lockTable map[SLockTableIndex]*SEtcdLockRecord
  149. config *SEtcdLockManagerConfig
  150. cli *clientv3.Client
  151. }
  152. func NewEtcdLockManager(config *SEtcdLockManagerConfig) (ILockManager, error) {
  153. if err := config.validate(); err != nil {
  154. return nil, err
  155. }
  156. cli, err := clientv3.New(clientv3.Config{
  157. Endpoints: config.Endpoints,
  158. Username: config.Username,
  159. Password: config.Password,
  160. TLS: config.TLS,
  161. DialOptions: config.dialOptions,
  162. DialTimeout: config.dialTimeout,
  163. })
  164. if err != nil {
  165. return nil, errors.Wrap(err, "new etcd client")
  166. }
  167. lockman := SEtcdLockManager{
  168. tableLock: &sync.Mutex{},
  169. lockTable: map[SLockTableIndex]*SEtcdLockRecord{},
  170. cli: cli,
  171. config: config,
  172. }
  173. atexit.Register(atexit.ExitHandler{
  174. Prio: atexit.PRIO_LOG_OTHER,
  175. Reason: "etcd-lockman",
  176. Value: lockman,
  177. Func: atexit.ExitHandlerFunc(lockman.destroyAtExit),
  178. })
  179. lockman.SBaseLockManager = NewBaseLockManger(&lockman)
  180. return &lockman, nil
  181. }
  182. func (lockman *SEtcdLockManager) destroyAtExit(eh atexit.ExitHandler) {
  183. log.Infof("closing etcd lockman session")
  184. for _, rec := range lockman.lockTable {
  185. if err := rec.sess.Close(); err != nil {
  186. log.Errorf("%s: session close: %v", rec.pfx, err)
  187. }
  188. }
  189. if err := lockman.cli.Close(); err != nil {
  190. log.Errorf("etcd lockman client close: %v", err)
  191. }
  192. }
  193. func (lockman *SEtcdLockManager) getRecordWithLock(ctx context.Context, key string) *SEtcdLockRecord {
  194. lockman.tableLock.Lock()
  195. defer lockman.tableLock.Unlock()
  196. return lockman.getRecord(ctx, key, true)
  197. }
  198. func (lockman *SEtcdLockManager) getRecord(ctx context.Context, key string, alloc bool) *SEtcdLockRecord {
  199. idx := SLockTableIndex{
  200. key: key,
  201. holder: ctx,
  202. }
  203. _, ok := lockman.lockTable[idx]
  204. if !ok {
  205. if !alloc {
  206. return nil
  207. }
  208. lockman.lockTable[idx] = newEtcdLockRecord(ctx, lockman, key)
  209. }
  210. return lockman.lockTable[idx]
  211. }
  212. func (lockman *SEtcdLockManager) LockKey(ctx context.Context, key string) {
  213. record := lockman.getRecordWithLock(ctx, key)
  214. record.lockContext(ctx)
  215. }
  216. func (lockman *SEtcdLockManager) UnlockKey(ctx context.Context, key string) {
  217. lockman.tableLock.Lock()
  218. defer lockman.tableLock.Unlock()
  219. record := lockman.getRecord(ctx, key, false)
  220. if record == nil {
  221. bug("%s: unlock a non-existent lock\n%s", key, debug.Stack())
  222. return
  223. }
  224. needClean := record.unlockContext(ctx)
  225. if needClean {
  226. idx := SLockTableIndex{
  227. key: key,
  228. holder: ctx,
  229. }
  230. delete(lockman.lockTable, idx)
  231. }
  232. }