elect.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package elect
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "sync"
  19. "time"
  20. clientv3 "go.etcd.io/etcd/client/v3"
  21. "go.etcd.io/etcd/client/v3/concurrency"
  22. "google.golang.org/grpc"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/options"
  26. )
  27. type EtcdConfig struct {
  28. Endpoints []string
  29. Username string
  30. Password string
  31. TLS *tls.Config
  32. LockTTL int
  33. LockPrefix string
  34. opts *options.DBOptions
  35. }
  36. func NewEtcdConfigFromDBOptions(opts *options.DBOptions) (*EtcdConfig, error) {
  37. tlsCfg, err := opts.GetEtcdTLSConfig()
  38. if err != nil {
  39. return nil, errors.Wrap(err, "etcd tls config")
  40. }
  41. config := &EtcdConfig{
  42. Endpoints: opts.EtcdEndpoints,
  43. Username: opts.EtcdUsername,
  44. Password: opts.EtcdPassword,
  45. LockTTL: opts.EtcdLockTTL,
  46. LockPrefix: opts.EtcdLockPrefix,
  47. TLS: tlsCfg,
  48. }
  49. if config.LockTTL <= 0 {
  50. config.LockTTL = 5
  51. }
  52. return config, nil
  53. }
  54. type Elect struct {
  55. cli *clientv3.Client
  56. path string
  57. ttl int
  58. stopFunc context.CancelFunc
  59. config *EtcdConfig
  60. mutex *sync.Mutex
  61. latestEv electEvent
  62. subscribers []chan electEvent
  63. }
  64. type ticket struct {
  65. session *concurrency.Session
  66. mutex *concurrency.Mutex
  67. }
  68. func (t *ticket) tearup(ctx context.Context) {
  69. if t.session != nil {
  70. t.session.Close()
  71. t.session = nil
  72. }
  73. }
  74. func NewElect(config *EtcdConfig, key string) (*Elect, error) {
  75. cli, err := clientv3.New(clientv3.Config{
  76. Endpoints: config.Endpoints,
  77. Username: config.Username,
  78. Password: config.Password,
  79. TLS: config.TLS,
  80. DialOptions: []grpc.DialOption{
  81. grpc.WithBlock(),
  82. grpc.WithTimeout(3000 * time.Millisecond),
  83. },
  84. DialTimeout: 3 * time.Second,
  85. })
  86. if err != nil {
  87. return nil, errors.Wrap(err, "new etcd client")
  88. }
  89. elect := &Elect{
  90. cli: cli,
  91. path: config.LockPrefix + "/" + key,
  92. ttl: config.LockTTL,
  93. mutex: &sync.Mutex{},
  94. latestEv: electEventInit,
  95. }
  96. return elect, nil
  97. }
  98. func (elect *Elect) Stop() {
  99. elect.stopFunc()
  100. }
  101. func (elect *Elect) Start(ctx context.Context) {
  102. ctx, elect.stopFunc = context.WithCancel(ctx)
  103. prev := electEventLost
  104. for {
  105. select {
  106. case <-ctx.Done():
  107. log.Infof("elect bye")
  108. return
  109. default:
  110. now := electEventWin
  111. ticket, err := elect.do(ctx)
  112. if err != nil {
  113. ticket.tearup(ctx)
  114. now = electEventLost
  115. log.Errorf("elect error: %v", err)
  116. }
  117. if now != prev {
  118. log.Infof("notify elect event: %s -> %s", prev, now)
  119. prev = now
  120. elect.notify(ctx, now)
  121. }
  122. if err == nil {
  123. select {
  124. case <-ctx.Done():
  125. ticket.tearup(ctx)
  126. case <-ticket.session.Done():
  127. }
  128. } else {
  129. time.Sleep(3 * time.Second)
  130. }
  131. }
  132. }
  133. }
  134. // do joins an election. the 1st return argument ticket must always be non-nil
  135. func (elect *Elect) do(ctx context.Context) (*ticket, error) {
  136. r := &ticket{}
  137. sess, err := concurrency.NewSession(
  138. elect.cli,
  139. concurrency.WithTTL(elect.ttl),
  140. )
  141. if err != nil {
  142. return r, err
  143. }
  144. r.session = sess
  145. em := concurrency.NewMutex(sess, elect.path)
  146. if err := em.Lock(ctx); err != nil {
  147. return r, err
  148. }
  149. r.mutex = em
  150. return r, err
  151. }
  152. func (elect *Elect) subscribe(ctx context.Context, ch chan electEvent) {
  153. elect.mutex.Lock()
  154. defer elect.mutex.Unlock()
  155. elect.subscribers = append(elect.subscribers, ch)
  156. if ev := elect.latestEv; ev != electEventInit {
  157. elect.notifyOne(ctx, ev, ch)
  158. }
  159. }
  160. func (elect *Elect) notifyOne(ctx context.Context, ev electEvent, ch chan electEvent) {
  161. sent := false
  162. select {
  163. case ch <- ev:
  164. sent = true
  165. case <-ctx.Done():
  166. default:
  167. }
  168. if !sent {
  169. log.Errorf("elect event '%s' missed by %#v", ev, ch)
  170. }
  171. }
  172. func (elect *Elect) notify(ctx context.Context, ev electEvent) {
  173. elect.mutex.Lock()
  174. defer elect.mutex.Unlock()
  175. elect.latestEv = ev
  176. for _, ch := range elect.subscribers {
  177. elect.notifyOne(ctx, ev, ch)
  178. }
  179. }
  180. func (elect *Elect) SubscribeWithAction(ctx context.Context, onWin, onLost func()) {
  181. go func() {
  182. ch := make(chan electEvent, 3)
  183. var ev electEvent
  184. elect.subscribe(ctx, ch)
  185. for {
  186. select {
  187. case ev = <-ch:
  188. case <-ctx.Done():
  189. return
  190. }
  191. drain:
  192. for {
  193. select {
  194. case ev = <-ch:
  195. continue
  196. default:
  197. break drain
  198. }
  199. }
  200. log.Infof("elect event %s", ev)
  201. switch ev {
  202. case electEventWin:
  203. onWin()
  204. case electEventLost:
  205. onLost()
  206. }
  207. }
  208. }()
  209. }
  210. type electEvent int
  211. const (
  212. electEventWin electEvent = iota
  213. electEventLost
  214. electEventInit
  215. )
  216. func (ev electEvent) String() string {
  217. switch ev {
  218. case electEventWin:
  219. return "win"
  220. case electEventLost:
  221. return "lost"
  222. case electEventInit:
  223. return "init"
  224. default:
  225. return "unexpected"
  226. }
  227. }