watcher.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package informer
  15. import (
  16. "context"
  17. "time"
  18. "yunion.io/x/jsonutils"
  19. "yunion.io/x/log"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
  22. "yunion.io/x/onecloud/pkg/cloudcommon/informer"
  23. "yunion.io/x/onecloud/pkg/mcclient"
  24. )
  25. type SWatchManager struct {
  26. client *mcclient.Client
  27. region string
  28. interfaceType string
  29. watchBackend informer.IWatcher
  30. }
  31. func NewWatchManagerBySession(session *mcclient.ClientSession) (*SWatchManager, error) {
  32. return NewWatchManager(session.GetClient(), session.GetToken(), session.GetRegion(), session.GetEndpointType())
  33. }
  34. func NewWatchManagerBySessionBg(session *mcclient.ClientSession, callback func(man *SWatchManager) error) {
  35. go func() {
  36. for {
  37. watchMan, err := NewWatchManagerBySession(session)
  38. if err != nil {
  39. log.Errorf("NewWatchManagerBySession error: %v", err)
  40. } else {
  41. if err := callback(watchMan); err != nil {
  42. log.Warningf("callback with watchMan error: %v", err)
  43. } else {
  44. log.Infof("callback with watchMan success.")
  45. break
  46. }
  47. }
  48. time.Sleep(10 * time.Second)
  49. }
  50. }()
  51. }
  52. func NewWatchManager(client *mcclient.Client, token mcclient.TokenCredential, region, interfaceType string) (*SWatchManager, error) {
  53. endpoint, err := client.GetCommonEtcdEndpoint(token, region, interfaceType)
  54. if err != nil {
  55. return nil, errors.Wrap(err, "get common etcd endpoint")
  56. }
  57. tlsCfg, err := client.GetCommonEtcdTLSConfig(endpoint)
  58. if err != nil {
  59. return nil, errors.Wrap(err, "get common etcd tls config")
  60. }
  61. opt := &etcd.SEtcdOptions{
  62. EtcdEndpoint: []string{endpoint.Url},
  63. EtcdTimeoutSeconds: 5,
  64. EtcdRequestTimeoutSeconds: 10,
  65. EtcdLeaseExpireSeconds: 5,
  66. }
  67. if tlsCfg != nil {
  68. tlsCfg.InsecureSkipVerify = true
  69. opt.EtcdEnabldSsl = true
  70. opt.TLSConfig = tlsCfg
  71. }
  72. be, err := informer.NewEtcdBackendForClient(opt)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "new etcd informer backend")
  75. }
  76. man := &SWatchManager{
  77. client: client,
  78. region: region,
  79. interfaceType: interfaceType,
  80. watchBackend: be,
  81. }
  82. return man, nil
  83. }
  84. type IResourceManager interface {
  85. KeyString() string
  86. GetKeyword() string
  87. }
  88. type EventHandler interface {
  89. OnAdd(obj *jsonutils.JSONDict)
  90. OnUpdate(oldObj, newObj *jsonutils.JSONDict)
  91. OnDelete(obj *jsonutils.JSONDict)
  92. }
  93. type IWatcher interface {
  94. AddEventHandler(ctx context.Context, handler EventHandler) error
  95. }
  96. type sWatcher struct {
  97. manager *SWatchManager
  98. resourceManager IResourceManager
  99. ctx context.Context
  100. eventHandler EventHandler
  101. }
  102. func (man *SWatchManager) For(resMan IResourceManager) IWatcher {
  103. watcher := &sWatcher{
  104. manager: man,
  105. resourceManager: resMan,
  106. }
  107. return watcher
  108. }
  109. func (w *sWatcher) AddEventHandler(ctx context.Context, handler EventHandler) error {
  110. w.ctx = ctx
  111. w.eventHandler = w.wrapEventHandler(handler)
  112. return w.manager.watch(w.ctx, w.resourceManager, w.eventHandler)
  113. }
  114. func (man *SWatchManager) watch(ctx context.Context, resMan IResourceManager, handler informer.ResourceEventHandler) error {
  115. return man.watchBackend.Watch(ctx, resMan.KeyString(), handler)
  116. }
  117. func (w *sWatcher) wrapEventHandler(handler EventHandler) informer.ResourceEventHandler {
  118. return &wrapEventHandler{handler}
  119. }
  120. type wrapEventHandler struct {
  121. handler EventHandler
  122. }
  123. func (h *wrapEventHandler) OnAdd(obj *jsonutils.JSONDict) {
  124. h.handler.OnAdd(obj)
  125. }
  126. func (h *wrapEventHandler) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
  127. h.handler.OnUpdate(oldObj, newObj)
  128. }
  129. func (h *wrapEventHandler) OnDelete(obj *jsonutils.JSONDict) {
  130. h.handler.OnDelete(obj)
  131. }
  132. type EventHandlerFuncs struct {
  133. AddFunc func(obj *jsonutils.JSONDict)
  134. UpdateFunc func(oldObj, newObj *jsonutils.JSONDict)
  135. DeleteFunc func(obj *jsonutils.JSONDict)
  136. }
  137. func (r EventHandlerFuncs) OnAdd(obj *jsonutils.JSONDict) {
  138. if r.AddFunc != nil {
  139. r.AddFunc(obj)
  140. }
  141. }
  142. func (r EventHandlerFuncs) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
  143. if r.UpdateFunc != nil {
  144. r.UpdateFunc(oldObj, newObj)
  145. }
  146. }
  147. func (r EventHandlerFuncs) OnDelete(obj *jsonutils.JSONDict) {
  148. if r.DeleteFunc != nil {
  149. r.DeleteFunc(obj)
  150. }
  151. }
  152. type FilteringEventHandler struct {
  153. FilterFunc func(obj *jsonutils.JSONDict) bool
  154. Handler EventHandler
  155. }
  156. func (r FilteringEventHandler) OnAdd(obj *jsonutils.JSONDict) {
  157. if !r.FilterFunc(obj) {
  158. return
  159. }
  160. r.Handler.OnAdd(obj)
  161. }
  162. func (r FilteringEventHandler) OnUpdate(oldObj, newObj *jsonutils.JSONDict) {
  163. newer := r.FilterFunc(newObj)
  164. older := r.FilterFunc(oldObj)
  165. switch {
  166. case newer && older:
  167. r.Handler.OnUpdate(oldObj, newObj)
  168. case newer && !older:
  169. r.Handler.OnAdd(newObj)
  170. case !newer && older:
  171. r.Handler.OnDelete(oldObj)
  172. default:
  173. // do nothing
  174. }
  175. }
  176. func (r FilteringEventHandler) OnDelete(obj *jsonutils.JSONDict) {
  177. if !r.FilterFunc(obj) {
  178. return
  179. }
  180. r.Handler.OnDelete(obj)
  181. }