etcd.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package informer
  15. import (
  16. "context"
  17. "fmt"
  18. "path/filepath"
  19. "strings"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
  24. "yunion.io/x/onecloud/pkg/util/ctx"
  25. )
  26. const (
  27. EtcdInformerPrefix = "/onecloud/informer"
  28. EtcdInformerClientsKey = "@clients"
  29. EventTypeCreate = "CREATE"
  30. EventTypeUpdate = "UPDATE"
  31. EventTypeDelete = "DELETE"
  32. )
  33. type TEventType string
  34. type modelObject struct {
  35. EventType TEventType `json:"event_type"`
  36. Object *jsonutils.JSONDict `json:"object"`
  37. OldObject *jsonutils.JSONDict `json:"old_object"`
  38. }
  39. func (obj modelObject) ToKey() string {
  40. return jsonutils.Marshal(obj).String()
  41. }
  42. func newModelObjectFromValue(val []byte) (*modelObject, error) {
  43. jObj, err := jsonutils.Parse(val)
  44. if err != nil {
  45. return nil, errors.Wrapf(err, "parse key %s", val)
  46. }
  47. ret := new(modelObject)
  48. if err := jObj.Unmarshal(ret); err != nil {
  49. return nil, errors.Wrap(err, "unmarshal to model object")
  50. }
  51. return ret, nil
  52. }
  53. type EtcdBackend struct {
  54. client *etcd.SEtcdClient
  55. leaseTTL int64
  56. }
  57. func newEtcdBackend(opt *etcd.SEtcdOptions, onKeepaliveFailure func()) (*EtcdBackend, error) {
  58. opt.EtcdNamspace = EtcdInformerPrefix
  59. be := new(EtcdBackend)
  60. be.leaseTTL = int64(opt.EtcdLeaseExpireSeconds)
  61. if onKeepaliveFailure == nil {
  62. onKeepaliveFailure = be.onKeepaliveFailure
  63. }
  64. cli, err := etcd.NewEtcdClient(opt, onKeepaliveFailure)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "new etcd client")
  67. }
  68. be.client = cli
  69. return be, nil
  70. }
  71. func NewEtcdBackend(opt *etcd.SEtcdOptions, onKeepaliveFailure func()) (*EtcdBackend, error) {
  72. be, err := newEtcdBackend(opt, onKeepaliveFailure)
  73. if err != nil {
  74. return nil, err
  75. }
  76. ctx := ctx.CtxWithTime()
  77. be.initClientResources(ctx)
  78. be.StartClientWatch(ctx)
  79. return be, nil
  80. }
  81. func (b *EtcdBackend) initClientResources(ctx context.Context) error {
  82. pairs, err := b.client.List(ctx, "/")
  83. if err != nil {
  84. return errors.Wrap(err, "list all client resources")
  85. }
  86. for _, pair := range pairs {
  87. keywordPlural, err := b.getClientWatchResource([]byte(pair.Key))
  88. if err == nil && len(keywordPlural) != 0 {
  89. AddWatchedResources(keywordPlural)
  90. }
  91. }
  92. return nil
  93. }
  94. func (b *EtcdBackend) getObjectKey(obj *ModelObject) string {
  95. if obj.IsJoint {
  96. return fmt.Sprintf("/%s/%s/%s", obj.KeywordPlural, obj.MasterId, obj.SlaveId)
  97. }
  98. return fmt.Sprintf("/%s/%s", obj.KeywordPlural, obj.Id)
  99. }
  100. func (b *EtcdBackend) getValue(eventType TEventType, obj *ModelObject) string {
  101. modelObject := modelObject{
  102. EventType: eventType,
  103. Object: obj.Object,
  104. }
  105. return modelObject.ToKey()
  106. }
  107. func (b *EtcdBackend) getUpdateValue(obj *ModelObject, oldObj *jsonutils.JSONDict) string {
  108. modelObject := modelObject{
  109. EventType: EventTypeUpdate,
  110. Object: obj.Object,
  111. OldObject: oldObj,
  112. }
  113. return modelObject.ToKey()
  114. }
  115. func (b *EtcdBackend) GetType() string {
  116. return "etcd"
  117. }
  118. func (b *EtcdBackend) Create(ctx context.Context, obj *ModelObject) error {
  119. key := b.getObjectKey(obj)
  120. val := b.getValue(EventTypeCreate, obj)
  121. return b.put(ctx, key, val)
  122. }
  123. func (b *EtcdBackend) Update(ctx context.Context, obj *ModelObject, oldObj *jsonutils.JSONDict) error {
  124. key := b.getObjectKey(obj)
  125. val := b.getUpdateValue(obj, oldObj)
  126. return b.put(ctx, key, val)
  127. }
  128. func (b *EtcdBackend) Delete(ctx context.Context, obj *ModelObject) error {
  129. key := b.getObjectKey(obj)
  130. val := b.getValue(EventTypeDelete, obj)
  131. err := b.put(ctx, key, val)
  132. return err
  133. }
  134. func (b *EtcdBackend) put(ctx context.Context, key, val string) error {
  135. return b.PutWithLease(ctx, key, val, b.leaseTTL)
  136. }
  137. func (b *EtcdBackend) PutSession(ctx context.Context, key, val string) error {
  138. return b.client.PutSession(ctx, key, val)
  139. }
  140. func (b *EtcdBackend) PutWithLease(ctx context.Context, key, val string, ttlSeconds int64) error {
  141. return b.client.PutWithLease(ctx, key, val, ttlSeconds)
  142. }
  143. func (b *EtcdBackend) onKeepaliveFailure() {
  144. if err := b.client.RestartSession(); err != nil {
  145. log.Errorf("restart etcd session error: %v", err)
  146. return
  147. }
  148. b.StartClientWatch(context.Background())
  149. }
  150. func (b *EtcdBackend) StartClientWatch(ctx context.Context) {
  151. b.client.Unwatch("/")
  152. b.client.Watch(ctx, "/", b.onClientResourceCreate, b.onClientResourceUpdate, b.onClientResourceDelete)
  153. }
  154. func (b *EtcdBackend) isClientsKey(key []byte) bool {
  155. return strings.Contains(string(key), EtcdInformerClientsKey)
  156. }
  157. func (b *EtcdBackend) getClientWatchResource(key []byte) (string, error) {
  158. // key is like: /servers/@clients/default-climc-5d4c8d49f6-p6l68
  159. keyPath := string(key)
  160. parts := strings.Split(keyPath, "/")
  161. if len(parts) != 4 {
  162. return "", errors.Errorf("invalid client resource key: %v", parts)
  163. }
  164. if parts[2] != EtcdInformerClientsKey {
  165. return "", errors.Errorf("key %s not contains %s", keyPath, EtcdInformerClientsKey)
  166. }
  167. return parts[1], nil
  168. }
  169. func (b *EtcdBackend) onClientResourceAdd(key []byte) {
  170. if !b.isClientsKey(key) {
  171. return
  172. }
  173. keywordPlural, err := b.getClientWatchResource(key)
  174. if err != nil {
  175. log.Errorf("get client watch resource error: %v", err)
  176. return
  177. }
  178. AddWatchedResources(keywordPlural)
  179. }
  180. func (b *EtcdBackend) onClientResourceCreate(ctx context.Context, key, value []byte) {
  181. b.onClientResourceAdd(key)
  182. }
  183. func (b *EtcdBackend) onClientResourceUpdate(ctx context.Context, key, oldvalue, value []byte) {
  184. b.onClientResourceAdd(key)
  185. }
  186. func (b *EtcdBackend) shouldDeleteWatchedResource(ctx context.Context, keywordPlural string) bool {
  187. // clientsKey is like: /servers/@clients
  188. clientsKey := fmt.Sprintf("/%s/%s", keywordPlural, EtcdInformerClientsKey)
  189. pairs, err := b.client.List(ctx, clientsKey)
  190. if err != nil {
  191. log.Errorf("list clientsKey %s error: %v", clientsKey, err)
  192. return false
  193. }
  194. return len(pairs) == 0
  195. }
  196. func (b *EtcdBackend) onClientResourceDelete(ctx context.Context, key []byte) {
  197. if !b.isClientsKey(key) {
  198. return
  199. }
  200. keywordPlural, err := b.getClientWatchResource(key)
  201. if err != nil {
  202. log.Errorf("get client watch resource keywordPlural error: %v", err)
  203. return
  204. }
  205. if b.shouldDeleteWatchedResource(ctx, keywordPlural) {
  206. DeleteWatchedResources(keywordPlural)
  207. }
  208. }
  209. func (b *EtcdBackend) getWatchKey(key string) string {
  210. return filepath.Join("/", key)
  211. }