apihelper.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package apihelper
  15. import (
  16. "context"
  17. "net/http"
  18. "sync"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/httputils"
  24. api "yunion.io/x/onecloud/pkg/apis/notify"
  25. "yunion.io/x/onecloud/pkg/appsrv"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  27. "yunion.io/x/onecloud/pkg/mcclient"
  28. "yunion.io/x/onecloud/pkg/mcclient/auth"
  29. npk "yunion.io/x/onecloud/pkg/mcclient/modules/notify"
  30. )
  31. const (
  32. MinSyncIntervalSeconds = 10
  33. MinRunDelayMilliseconds = 100
  34. )
  35. type APIHelper struct {
  36. opts *Options
  37. modelSets IModelSets
  38. modelSetsCh chan IModelSets
  39. mcclientSession *mcclient.ClientSession
  40. tick *time.Timer
  41. }
  42. func NewAPIHelper(opts *Options, modelSets IModelSets) (*APIHelper, error) {
  43. modelSetsCh := make(chan IModelSets)
  44. helper := &APIHelper{
  45. opts: opts,
  46. modelSets: modelSets,
  47. modelSetsCh: modelSetsCh,
  48. }
  49. return helper, nil
  50. }
  51. func (h *APIHelper) getSyncInterval() time.Duration {
  52. intv := h.opts.SyncIntervalSeconds
  53. if intv < MinSyncIntervalSeconds {
  54. intv = MinSyncIntervalSeconds
  55. }
  56. return time.Duration(intv) * time.Second
  57. }
  58. func (h *APIHelper) getRunDelay() time.Duration {
  59. delay := h.opts.RunDelayMilliseconds
  60. if delay < MinRunDelayMilliseconds {
  61. delay = MinRunDelayMilliseconds
  62. }
  63. return time.Duration(delay) * time.Millisecond
  64. }
  65. func (h *APIHelper) addSyncHandler(app *appsrv.Application, prefix string) {
  66. path := httputils.JoinPath(prefix, "sync")
  67. app.AddHandler("POST", path, h.handlerSync)
  68. }
  69. func (h *APIHelper) handlerSync(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  70. h.scheduleSync()
  71. }
  72. func (h *APIHelper) Start(ctx context.Context, app *appsrv.Application, prefix string) {
  73. defer func() {
  74. log.Infoln("apihelper: bye")
  75. wg := ctx.Value("wg").(*sync.WaitGroup)
  76. wg.Done()
  77. }()
  78. if app != nil {
  79. h.addSyncHandler(app, prefix)
  80. }
  81. h.run(ctx)
  82. tickDuration := h.getSyncInterval()
  83. h.tick = time.NewTimer(tickDuration)
  84. defer func() {
  85. tick := h.tick
  86. h.tick = nil
  87. tick.Stop()
  88. }()
  89. for {
  90. select {
  91. case <-h.tick.C:
  92. h.run(ctx)
  93. h.tick.Reset(tickDuration)
  94. case <-ctx.Done():
  95. return
  96. }
  97. }
  98. }
  99. func (h *APIHelper) scheduleSync() {
  100. if h.tick != nil {
  101. if !h.tick.Stop() {
  102. <-h.tick.C
  103. }
  104. h.tick.Reset(h.getRunDelay())
  105. }
  106. }
  107. func (h *APIHelper) ModelSets() <-chan IModelSets {
  108. return h.modelSetsCh
  109. }
  110. func (h *APIHelper) RunManually(ctx context.Context) {
  111. h.run(ctx)
  112. }
  113. func (h *APIHelper) run(ctx context.Context) {
  114. changed, err := h.doSync(ctx)
  115. if err != nil {
  116. log.Errorf("doSync error: %v", err)
  117. }
  118. if changed {
  119. mssCopy := h.modelSets.CopyJoined()
  120. select {
  121. case h.modelSetsCh <- mssCopy:
  122. case <-ctx.Done():
  123. }
  124. }
  125. }
  126. func (h *APIHelper) doSync(ctx context.Context) (changed bool, err error) {
  127. {
  128. stime := time.Now()
  129. defer func() {
  130. elapsed := time.Since(stime)
  131. log.Infof("sync data done, changed: %v, elapsed: %s", changed, elapsed.String())
  132. }()
  133. }
  134. s := h.adminClientSession(ctx)
  135. mss := h.modelSets.Copy()
  136. r, err := SyncModelSets(mss, s, h.opts)
  137. if err != nil {
  138. return false, errors.Wrap(err, "SyncModelSets")
  139. }
  140. h.modelSets = mss
  141. if !r.Correct {
  142. // 发送消息通知
  143. err := sendSyncErrNotify(s)
  144. if err != nil {
  145. log.Errorf("unable to EventNotify: %s", err)
  146. }
  147. return false, errors.Errorf("sync error")
  148. }
  149. changed = r.Changed
  150. return changed, nil
  151. }
  152. func (h *APIHelper) adminClientSession(ctx context.Context) *mcclient.ClientSession {
  153. s := h.mcclientSession
  154. if s != nil {
  155. token := s.GetToken()
  156. expires := token.GetExpires()
  157. if time.Now().Add(time.Hour).Before(expires) {
  158. return s
  159. }
  160. }
  161. region := h.opts.CommonOptions.Region
  162. h.mcclientSession = auth.GetAdminSession(ctx, region)
  163. return h.mcclientSession
  164. }
  165. func sendSyncErrNotify(s *mcclient.ClientSession) error {
  166. params := api.NotificationManagerEventNotifyInput{}
  167. params.Event = api.Event.WithAction(api.ActionNetOutOfSync).WithResourceType(api.TOPIC_RESOURCE_NET).String()
  168. params.AdvanceDays = 0
  169. message := &jsonutils.JSONDict{}
  170. message.Add(jsonutils.NewString(consts.GetServiceType()), "service_name")
  171. params.ResourceDetails = message
  172. _, err := npk.Notification.PerformClassAction(s, "event-notify", jsonutils.Marshal(params))
  173. return err
  174. }